preface

In the last article, we implemented the basic RPC client and server, and this time we are going to start implementing the higher-level functionality. Space is limited, the specific code implementation see: code address

Foundation support

Upgraded versions of Client and Server

The client implementation

Server implementation

First, let’s redefine Client and Server: SGClient and SGServer. SGClient encapsulates the RPCClient operations defined in the previous section and provides features related to service governance. SGServer is upgraded from RPCServer defined in the previous section to support features related to service governance. Service governance (SG) indicates service governance. Here’s the definition:

type SGClient interface {
	Go(ctx context.Context, ServiceMethod string, arg interface{}, reply interface{}, done chan *Call) (*Call, error)
	Call(ctx context.Context, ServiceMethod string, arg interface{}, reply interface{}) error
}
type sgClient struct {
	shutdown  bool
	option    SGOption
	clients   sync.Map //map[string]RPCClient
	serversMu sync.RWMutex
	servers   []registry.Provider
}
type RPCServer interface {
	Register(rcvr interface{}, metaData map[string]string) error
	Serve(network string, addr string) error
	Services() []ServiceInfo
	Close() error
}
type SGServer struct { // The original RPCServer
	codec      codec.Codec
	serviceMap sync.Map
	tr         transport.ServerTransport
	mutex      sync.Mutex
	shutdown   bool
	Option Option
}
Copy the code

The interceptor

As mentioned in the previous article, we need to provide the same usage as filters to achieve the goal of being open for extensions and closed for modifications. Here we use the way of higher-order function to define square section and method interceptor. First we define several sections:

// Client-side cutting
type CallFunc func(ctx context.Context, ServiceMethod string, arg interface{}, reply interface{}) error
type GoFunc func(ctx context.Context, ServiceMethod string, arg interface{}, reply interface{}, done chan *Call) *Call// Server cuttingtype ServeFunc func(network string, addr string) error
type ServeTransportFunc func(tr transport.Transport)
type HandleRequestFunc func(ctx context.Context, request *protocol.Message, response *protocol.Message, tr transport.Transport)
Copy the code

The above are the functions that RPC calls pass through on both client and server sides. We define them as facets and then define the corresponding interceptors:

// Client interceptor
packege client
type Wrapper interface {
	WrapCall(option *SGOption, callFunc CallFunc) CallFunc
	WrapGo(option *SGOption, goFunc GoFunc) GoFunc
}
//f server interceptor
package server
type Wrapper interface {
	WrapServe(s *SGServer, serveFunc ServeFunc) ServeFunc
	WrapServeTransport(s *SGServer, transportFunc ServeTransportFunc) ServeTransportFunc
	WrapHandleRequest(s *SGServer, requestFunc HandleRequestFunc) HandleRequestFunc
}
Copy the code

In this way, users can implement the Wapper interface to enhance the behavior of the client or server, such as logging the request parameters and results, dynamic modification of parameters or responses, and so on. The functionality of our framework itself can also be implemented through wrappers. Currently, the client implements MetaDataWrapper for encapsulating metadata and LogWrapper for logging requests and responses. The server currently implements logic in DefaultWrapper for service registration, listening for exit signals, and request counting.

Because GO does not provide a way to abstract classes, some implementation classes may not need to intercept all aspects (such as Call only but not GO), in which case they simply return the function object in the argument.

Client interceptor implementation

Server-side interceptor implementation

Service Governance

Service registration and discovery

Before this, our RPC service calls were called by specifying the IP and port of the server on the client side. This method is very simple but has limited scenarios. It can only be used in tests or demos. Therefore, we need to provide service registration and discovery functions, so that the client configuration is no longer bound to the actual IP, but through a separate registry to get the list of servers, and can be updated in real time when the server node changes.

First define the relevant interface (code address) :

//Registry contains two parts: service registration (for the server) and service discovery (for the client)
type Registry interface{ Register(option RegisterOption, provider ... Provider)/ / registerUnregister(option RegisterOption, provider ... Provider)/ / logout
	GetServiceList() []Provider // Get the list of services
	Watch() Watcher // Listen for changes in the service list
	Unwatch(watcher Watcher) // Cancel the listener
}
type RegisterOption struct {
	AppKey string //AppKey Uniquely identifies an application
}
type Watcher interface {
	Next() (*Event, error) // Get the next update to the service list
	Close()
}
type EventAction byte
const (
	Create EventAction = iota
	Update
	Delete
)
type Event struct { //Event indicates an update
	Action    EventAction
	AppKey    string
	Providers []Provider // Specific changing service provider (incremental rather than full)
}
type Provider struct { // A specific service provider
	ProviderKey string // Network+"@"+Addr
	Network     string
	Addr        string
	Meta        map[string]string
}
Copy the code

AppKey

We use AppKey such a concept to identify a service, such as com. At meituan. Demo.. RPC server. The server registers its own information (including AppKey, IP, port, method list, etc.) in the registry during startup. When the client needs to call, it only needs to search the registry according to the AppKey of the server.

At present, only direct connection (Peer2peer) and inmemory-based service registration are implemented, and then other independent components such as ETCD or ZooKeeper are connected.

InMemory code implements the address

Load balancing

Have a service registration and discovery, a client faces may not be only one server, the client before a call to choose one from the multiple server to the actual communication, the choice of specific strategy has a lot of, such as random selection, choice of polling, based on the weight, based on the service side load or custom rules, and so on.

Here is the interface definition:

//Filter Is used to Filter a node by user-defined rules
type Filter func(provider registry.Provider, ctx context.Context, ServiceMethod string, arg interface{}) bool
type SelectOption struct {
	Filters []Filter
}
type Selector interface {
	Next(providers []registry.Provider, ctx context.Context, ServiceMethod string, arg interface{}, opt SelectOption) (registry.Provider, error)
}
Copy the code

At present, only random load balancing is implemented. In the future, other policies such as polling or consistent hashing will be implemented. Users can also choose to implement their own load balancing policies.

Fault-tolerant processing

Long connections and network reconnections

To reduce the overhead of frequently creating and disconnecting network connections, we maintain long connections from the client to the server, and cache the created connections (RPCClient objects) with a map, and the key is the corresponding server identifier. The client retrieves the cached RPCClient according to the result of load balancing before calling and then initiates the call. When we cannot retrieve the corresponding client or discover that the cached client is invalid, we need to re-establish the connection (recreate the RPCClient object).

func (c *sgClient) selectClient(ctx context.Context, ServiceMethod string, arg interface{}) (provider registry.Provider, client RPCClient, err error) {
        // Determine which server to call based on load balancing
	provider, err = c.option.Selector.Next(c.providers(), ctx, ServiceMethod, arg, c.option.SelectOption)
	iferr ! =nil {
		return
	}
	client, err = c.getClient(provider)
	return
}

func (c *sgClient) getClient(provider registry.Provider) (client RPCClient, err error) {
	key := provider.ProviderKey
	rc, ok := c.clients.Load(key)
	if ok {
		client := rc.(RPCClient)
		if client.IsShutDown() {
		    // If it is already invalid, it is cleared
			c.clients.Delete(key)
		}
	}
        // Retrieve it again
	rc, ok = c.clients.Load(key)
	if ok {
	        // There is already a cache, return the cached RPCClient
		client = rc.(RPCClient)
	} else {
	        // There is no cache, create a new one and update it to the cache
		client, err = NewRPCClient(provider.Network, provider.Addr, c.option.Option)
		iferr ! =nil {
			return
		}
		c.clients.Store(key, client)
	}
	return
}
Copy the code

In the current implementation, each service provider has only one CORRESPONDING RPCClient. In the future, the implementation similar to connection pool can be considered, that is, each service provider has multiple RPCClient, and one RPCClient can be extracted from the connection pool before each invocation.

The cluster tolerance

Exceptions are inevitable in distributed systems. When an invocation fails, we have a choice of how to handle it. Here are some common examples:

type FailMode byte
const (
	FailFast FailMode = iota // Fail quickly
	FailOver // Retry other servers
	FailRetry // Retry the same server
	FailSafe // Ignore the failure and return directly
)
Copy the code

The specific implementation is relatively simple, which is to determine whether to retry according to the configured fault tolerance options and retry times. Others, such as FailBack (retransmission after a period of time), Fork, and Broadcast, are not implemented.

Graceful exit

When receiving the program exit signal, the server will try to finish processing the current request first, and then exit after the request is processed. When the specified time (default 12s) is exceeded, the server will directly exit.

func (s *SGServer) Close(a) error {
	s.mutex.Lock()
	defer s.mutex.Unlock()
	s.shutdown = true
	// Wait for the current request to finish processing or until the specified time
	ticker := time.NewTicker(s.Option.ShutDownWait)
	defer ticker.Stop()
	for {
		if s.requestInProcess <= 0 { //requestInProcess represents the number of requests currently being processed, counted in the wrapper
			break
		}
		select {
		case <-ticker.C:
			break}}return s.tr.Close()
}
Copy the code

conclusion

Here is all the content of this time, generally speaking, on the basis of the previous done encapsulation, reserved for subsequent extension points, and then implement simple service governance related functions. To summarize, this time we made the following changes from the previous article:

  1. Redefined the interface between Client and Server
  2. Provides interceptors (Wrapper interfaces)
  3. Provides service registration and discovery and load balancing interface and simple implementation
  4. Simple fault tolerant processing is implemented
  5. Implements a simple graceful exit
  6. Added goB serialization support (easy, not mentioned in the article)

The historical link

Implementing an RPC framework from scratch (zero)

Implementing an RPC Framework from Scratch (PART 1)