1. An overview of the

Go source code with RPC framework, in a relatively simplified way to achieve the RPC function, the current source RPC official has announced no new features, and recommended the use of GRPC. As the GO standard library RPC framework, or there are many places worth learning and learning, here will be from the source point of view of go native RPC framework.

2. The server side

The server side is divided into two steps: first, the method is registered, and the method is extracted through reflection processing and stored in map. Then there is the network call, which mainly listens to the port, reads the packet, decodes the request and invokes the reflected method, encodes the return value and returns it to the client.

2.1 Method Registration

2.1.1 the Register
// Register publishes the receiver's methods in the DefaultServer. func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) } // RegisterName is like Register but uses the provided name for the type // instead of the receiver's concrete type.
func RegisterName(name string, rcvr interface{}) error {
	return DefaultServer.RegisterName(name, rcvr)
}
Copy the code

As mentioned above, there are two entry functions for method registration, namely Register and RegisterName, where interface{} is usually an object with a method. If you want to customize the receiver object for a method, you can use RegisterName.

2.1.2 Reflection Processing process
typeStruct {sync.Mutex = ArgType reflect. method () ReplyType reflect.Type // Return the reflection value of the parameter numCalls uint // number of calls}typeRCVR reflect.Value // typ reflect.Type // The receiver's type is method map[string]*methodType // the reflection result of all methods of the object.}Copy the code

Multiply(xx,xx) error: map[“Arith”]*service: map[“Arith”]*service: map[“Arith”] Ethod in the service is map[“Multiply”]*methodType.

A few key codes are as follows:

Generating a Service object

func (server *Server) register(rcvr interface{}, name string, UseName bool) error {// Generate service s := new(service) s.typ = reflect.TypeOf(RCVR) s.rcvr = reflect.valueof (RCVR) sname :=  reflect.Indirect(s.rcvr).Type().Name() .... S.name = sname // Convert object methods to map[string]*methodType structure with suitableMethods.true)... // Service is stored as a key-value pairif _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
		return errors.New("rpc: service already defined: " + sname)
	}
	return nil
}
Copy the code

To generate the map [string] * methodType

func suitableMethods(typ reflect.Type, ReportErr bool map[string]*methodType {methods := make(map[string]*methodType) // Use reflection to iterate over all methodsfor m := 0; m < typ.NumMethod(); m++ {
		method := typ.Method(m)
		mtype := method.Type
		mname := method.Name
		// Method must be exported.
		ifmethod.PkgPath ! ="" {
			continue
		}
		// Method needs three ins: receiver, *args, *reply.
		ifmtype.NumIn() ! 3 = {if reportErr {
				log.Println("method", mname, "has wrong number of ins:", mtype.NumIn())
			}
			continue} argType := mtype.in (1)... ReplyType := mtype.in (2)ifreplyType.Kind() ! = reflect.Ptr {if reportErr {
				log.Println("method", mname, "reply type not a pointer:", replyType)
			}
			continue}... // Remove the return value of the function, which must be error.if returnType := mtype.Out(0); returnType ! =typeOfError {
			if reportErr {
				log.Println("method", mname, "returns".returnType.String(), "not error")}continueMethod [mname] = &methodType{method: method, ArgType: ArgType, ReplyType: ReplyType}}return methods
}
Copy the code

2.2 Network Invocation

// Request The header of the Request for each RPC calltypeRequest struct {ServiceMethod string //"Service.Method"Seq uint64 // Serial number generated by the client. Next *Request // Linked list maintained by the server.} // Response Indicates the header of the Response for each RPC calltypeResponse struct {ServiceMethod string // ServiceMethod Seq Uint64 // Seq Error string of the request // Next *Response // Server-maintained list}Copy the code

As mentioned above, the network call mainly uses the above two constructs, namely the request parameters and the return parameters, and implements the binary to structure conversion through the codec (GOB/JSON). It mainly involves the following steps:

The key code is as follows: Fetch the request and get the call parameters of the corresponding function

func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, Err = codec.readrequestheader (req) {// Grab the request header.req = server.getrequest ()iferr ! = nil {// error handling...return
	}

	keepReading = trueDot := strings.LastIndex(req.servicemethod,".")
	if dot < 0 {
		err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
		return} serviceName := req.servicemethod [:dot] methodName := req.servicemethod [dot+1:] ok := server.serviceMap.Load(serviceName)if! ok { err = errors.New("rpc: can't find service " + req.ServiceMethod)
		return} SVC = svci.(*service) mtype = svc.method[methodName]if mtype == nil {
		err = errors.New("rpc: can't find method " + req.ServiceMethod)
	}

Copy the code

// Loop, read the byte stream on the link, decrypt the request, call the method, encode the response, write back to the client.

func (server *Server) ServeCodec(codec ServerCodec) {
	sending := new(sync.Mutex)
	for{service, mtype, req, argv, replyV, keepReading, err := server.readRequest(codec)iferr ! = nil { ... Call (server, sending, mtype, req, argv, replyv, codec)} codec.close ()}Copy the code

Function calls are made with arguments

func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
	mtype.Lock()
	mtype.numCalls++
	mtype.Unlock()
	functionFunc := mtype.method.func // Function call via reflectionreturnValues := function.call ([]reflect.Value{s.rcvr, argv, replyv}) // If the Value is not null, string errInter :=returnValues[0].Interface()
	errmsg := ""
	iferrInter ! = nil {errmsg = errInter.(error).error ()} server.sendresponse (sending, req, replyv.interface (), codec, errmsg) server.freeRequest(req) }Copy the code

3. The client side

Func (client * client) Go(serviceMethod string, args interface{}, reply interface{},doneFunc (client * client) Call(serviceMethod String, args interface{}, reply interface{}) error { }Copy the code
// Call represents an active RPC.
typeCall struct {ServiceMethod string // Struct {ServiceMethod string; Reply interface{} // Reply interface{} // response parameter of the function (*struct). Error Error // Error status of the method after completion. Done chan *Call // channel.}Copy the code

The client side is much simpler, mainly providing Call and Go methods, respectively representing synchronous Call and asynchronous Call. However, the underlying implementation of synchronous Call is actually asynchronous Call, and the Call structure is mainly used in the Call, as explained above.

3.1 Main Process

3.2 Key Codes

Part of the code for sending a request. Each time a request is sent, a Call object is generated and stored in the map using SEQ as the key. When the server returns, it retrieves the call from the Map and processes it accordingly.

Func (client * client) send(call * call) {// Request level Lock client.reqmutex.lock () defer client.reqmutex.unlock () // Register this  call. client.mutex.Lock()if client.shutdown || client.closing {
		call.Error = ErrShutdown
		client.mutex.Unlock()
		call.done()
		returnSeq := client.seq client.seq++ client.pending[seq] = Call Client.mutex.unlock () / / request and send the request to the client. Request. Seq = Seq. Client request. ServiceMethod = call. ServiceMethod err: = client.codec.WriteRequest(&client.request, call.Args)iferr ! Client.mutex.lock () call = client.pending[seq] delete(client.pending, seq) client.mutex.Unlock()ifcall ! = nil { call.Error = err call.done() } } }Copy the code

The code for receiving the Response part, which is a for loop, reads the stream over TCP and decodes it into a Response object and a Reply object for the method.

func (client *Client) input() {
	var err error
	var response Response
	for err == nil {
		response = Response{}
		err = client.codec.ReadResponseHeader(&response)
		iferr ! = nil {breakSeq := response.seq client.mutex.lock () call := client.pending[Seq] delete(client.pending, seq) client.mutex.Unlock() switch {case call == nil:
			err = client.codec.ReadResponseBody(nil)
			iferr ! = nil { err = errors.New("reading error body: " + err.Error())
			}
		caseresponse.Error ! ="": / / server returns an Error, direct will return to call Error. The Error = ServerError (response. The Error) err = client. Codec. ReadResponseBody (nil)iferr ! = nil { err = errors.New("reading error body: "+ err. Error ())} call. Done (the default) : / / through the encoder, the body part of the Resonse decoded into reply. Err = client. Codec. ReadResponseBody (call. Reply)iferr ! = nil { call.Error = errors.New("reading body "+ err.error ())} call.done()}} // Client exits processing client.reqmutex.lock () client.mutex.lock () client.shutdown =true
	closing := client.closing
	if err == io.EOF {
		if closing {
			err = ErrShutdown
		} else {
			err = io.ErrUnexpectedEOF
		}
	}
	for _, call := range client.pending {
		call.Error = err
		call.done()
	}
	client.mutex.Unlock()
	client.reqMutex.Unlock()
	ifdebugLog && err ! = io.EOF && ! closing { log.Println("rpc: client protocol error:", err)
	}
}

Copy the code

4. Some disadvantages

  • Synchronous Call cannot timeout Because native RPC only provides two methods, synchronous Call and asynchronous Go. If the synchronous Call server does not return, it will always be blocked. If a large number of non-returns exist, the coroutine cannot be released.

  • Memory leaks after asynchronous calls time out The timeout function based on asynchronous invocation and channel will also have leakage problems, because the client’s request will be stored in the map structure, and the Go function will not clean up the map content. Therefore, if the server does not return, the request in the map will always be stored, resulting in memory leakage.

  • The keepalive mechanism does not exist. When the keepalive mechanism is not used, the upper layer cannot perceive that the underlying link is unavailable. As a result, the upper layer sends a request and cannot receive any response.

5. To summarize

On the whole, the go native RPC is a basic version of the RPC, code concise, high extensibility, but just realized the RPC the most basic network communication, like overtime fusing, link management (keep alive and reconnection), service registry, found that was lacking, so still can not meet the production out of the box, relatively GRPC to mature a lot, recently Integrate a set of microservices communication framework based on GRPC, most of the components are open source, see GRPC-Wrapper project.

6. Reference

rpc