An overview of the

For go, a new language, the ecological chain is still in the stage of development, and so is the micro-service framework. The following will build a micro-service communication framework based on grPC-Go.

1. Mechanism of service registration and publication

1.1 Problems solved

Service registration and publishing mainly solve the problem of service dependency. In general, if service A calls service B, the most direct way is to configure IP address and port. However, as the service dependency increases, the configuration becomes very complex, and when the service is migrated, the configuration of all related services needs to be modified, which can be difficult to maintain and prone to problems. Therefore, to solve this service dependency, service registration and publication came into being.

1.2 mechanism

  • Service information publishing here is mainly the service name of the service,IP information, and some attachment metadata. Register with the service Registry publication center through the registry interface.
  • Survival detection When the service stops unexpectedly, the client needs to detect the service stop and remove the IP address of the service from the list of available IP addresses. This can be implemented using the scheduled heartbeat.
  • Client load balancing Through service registration and publication, a service can be deployed with multiple instances, and clients can directly balance the load on the instances, thus realizing horizontal service expansion.

Therefore, service registration and publication can be summarized as follows: the service reports the information, the client pulls the service information and invokes it by the service name. When the service is down, the client kicks the failed service, and the client automatically adds to the invocation list when the service goes online.

2. Implement

The whole implementation of GRPC-Go uses a lot of go interface features, so through the extension of the interface, it is easy to realize the registration and discovery of services, here the service registry to consider availability and consistency, generally using ETCD or ZooKeeper to achieve, here to achieve the version of ETCD. See grPC-wrapper for the complete code and an example use

2.1 the client

There are several interfaces that need to be implemented. For the client side, the simplest implementation is to implement two interface methods Resolve() and Next(), and then use the load balancing method of polling. It is mainly realized through etCD Get interface and Watch interface.

  • The Resolve () interface
// Generate Watcher to listen for service information changes in the registry
func (er *etcdRegistry) Resolve(target string) (naming.Watcher, error) {
	ctx, cancel := context.WithTimeout(context.TODO(), resolverTimeOut)
	w := &etcdWatcher{
		cli:    er.cli,
		target: target + "/",
		ctx:    ctx,
		cancel: cancel,
	}
	return w, nil
}

Copy the code
  • The Next () interface
// The Next interface is mainly used to get the registered service information, through channel and watch, when the service information occurs
// When changes occur, the Next interface returns the changes to the GRPC framework to implement service information changes.
func (ew *etcdWatcher) Next(a) ([]*naming.Update, error) {
	var updates []*naming.Update
    // Create a listening channel and return the service information
	if ew.watchChan == nil {
		//create new chan
		resp, err := ew.cli.Get(ew.ctx, ew.target, etcd.WithPrefix(), etcd.WithSerializable())
		iferr ! =nil {
			return nil, err
		}
		for _, kv := range resp.Kvs {
			var upt naming.Update
			iferr := json.Unmarshal(kv.Value, &upt); err ! =nil {
				continue
			}
			updates = append(updates, &upt)
		}
        // Create the etCD watcher to listen for information about the target(service name).
		opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}
		ew.watchChan = ew.cli.Watch(context.TODO(), ew.target, opts...)
		return updates, nil
	}

    // Block the listener and return it to the upper layer when the service changes
	wrsp, ok := <-ew.watchChan
	if! ok { err := status.Error(codes.Unavailable,"etcd watch closed")
		return nil, err
	}
	ifwrsp.Err() ! =nil {
		return nil, wrsp.Err()
	}
	for _, e := range wrsp.Events {
		var upt naming.Update
		var err error
		switch e.Type {
		case etcd.EventTypePut:
			err = json.Unmarshal(e.Kv.Value, &upt)
			upt.Op = naming.Add
		case etcd.EventTypeDelete:
			err = json.Unmarshal(e.PrevKv.Value, &upt)
			upt.Op = naming.Delete
		}

		iferr ! =nil {
			continue
		}
		updates = append(updates, &upt)
	}
	return updates, nil
}

Copy the code

2.2 the service side

The server only needs to report the service information and hold the heartbeat periodically through the Put and KeepAlive interfaces of etCD. Details are as follows:

func (er *etcdRegistry) Register(ctx context.Context, target string, update naming.Update, opts ... wrapper.RegistryOptions) (err error) {
	// Serialize the service information to JSON format
    var upBytes []byte
	ifupBytes, err = json.Marshal(update); err ! =nil {
		return status.Error(codes.InvalidArgument, err.Error())
	}

	ctx, cancel := context.WithTimeout(context.TODO(), resolverTimeOut)
	er.cancal = cancel
	rgOpt := wrapper.RegistryOption{TTL: wrapper.DefaultRegInfTTL}
	for _, opt := range opts {
		opt(&rgOpt)
	}

	switch update.Op {
	case naming.Add:
		lsRsp, err := er.lsCli.Grant(ctx, int64(rgOpt.TTL/time.Second))
		iferr ! =nil {
			return err
		}

        //Put the service information to etcd and set the TTL of the key through the KeepAlive interface
        // Renew the TTL. If no extension request is received within the TTL, the service may be suspended and the service information will be cleared
		etcdOpts := []etcd.OpOption{etcd.WithLease(lsRsp.ID)}
		key := target + "/" + update.Addr
		_, err = er.cli.KV.Put(ctx, key, string(upBytes), etcdOpts...)
		iferr ! =nil {
			return err
		}

        // Keep the heart beating
		lsRspChan, err := er.lsCli.KeepAlive(context.TODO(), lsRsp.ID)
		iferr ! =nil {
			return err
		}
		go func(a) {
			for {
				_, ok := <-lsRspChan
				if! ok { grpclog.Fatalf("%v keepalive channel is closing", key)
					break}}} ()case naming.Delete:
		_, err = er.cli.Delete(ctx, target+"/"+update.Addr)
	default:
		return status.Error(codes.InvalidArgument, "unsupported op")}return nil
}
Copy the code

Reference 3.

grpc

etcd

grpc-wrapper