What gRPC is and its features are not detailed here, the relevant articles are particularly many, their own search can be, here directly to the official example hello World project source code reading.

From server/main.go in the hello-world example, we can see the following code snippet to generate a server:

lis, err := net.Listen("tcp", port)
iferr ! =nil {
	log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterGreeterServer(s, &server{})
iferr := s.Serve(lis); err ! =nil {
	log.Fatalf("failed to serve: %v", err)
}
Copy the code

From this code logic, we can see that creating a server can be roughly divided into the following steps:

  • Create a new oneserver(grpc.NewServer())
  • serverTo register
  • The call method listens on the port

Create a server

In this logic we can see the following code by entering the NewServer method:

func NewServer(opt ... ServerOption) *Server {
	opts := defaultServerOptions / / 1.
	for _, o := range opt {
		o.apply(&opts)
	}
	s := &Server{ / / 2.
		lis:    make(map[net.Listener]bool),
		opts:   opts,
		conns:  make(map[transport.ServerTransport]bool),
		m:      make(map[string]*service),
		quit:   grpcsync.NewEvent(),
		done:   grpcsync.NewEvent(),
		czData: new(channelzData),
	}
	chainUnaryServerInterceptors(s) / / 3.
	chainStreamServerInterceptors(s)
	s.cv = sync.NewCond(&s.mu)
	if EnableTracing {
		_, file, line, _ := runtime.Caller(1)
		s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
	}

	if channelz.IsOn() {
		s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")}return s
}
Copy the code

Let’s take a line-by-line analysis. First, the method’s inputs are some optional server parameters. ServerOption itself is an interface with an Apply method

type ServerOption interface {
    apply(*serverOptions)
}
Copy the code

According to the official notes, the main method in this interface is to set optional parameters to the server, such as coDEC, or the life cycle of the parameters, etc. The serverOptions structure defines these server parameters.

Going back to comment 1, our program first sets the necessary server parameters as follows:

var defaultServerOptions = serverOptions{
	maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
	maxSendMessageSize:    defaultServerMaxSendMessageSize,
	connectionTimeout:     120 * time.Second,
	writeBufferSize:       defaultWriteBufSize,
	readBufferSize:        defaultReadBufSize,
}
Copy the code

Examples include the default maximum size of messages that can be received and sent, connection timeouts, and Buffer sizes.

The for loop we enter then sets all the server parameters to our optionServers structure.

Further down to comment 2, we set our Server structure, which looks like this:

type Server struct {
    opts serverOptions

    mu     sync.Mutex / / the mutex
    lis    map[net.Listener]bool // listener map
    conns  map[transport.ServerTransport]bool //connextions map
    serve  bool // Whether the request status bit is being processed
    drain  bool
    cv     *sync.Cond          // signaled when connections close for GracefulStop
    m      map[string]*service // service name -> service info
    events trace.EventLog

    quit               *grpcsync.Event
    done               *grpcsync.Event
    channelzRemoveOnce sync.Once
    serveWG            sync.WaitGroup // counts active Serve goroutines for GracefulStop

    channelzID int64 // channelz unique identification number
    czData     *channelzData
}
Copy the code

It can be seen that there are three important maps, which store information about listener, connection and service respectively. Other fields provide information about server status or concurrency control.

A listener is an interface that provides three methods, Accept(),Close(), and Addr(), to enable the server to connect,Close the listener, and return the network address of the listener.

For a map storing services, the structure of a service is as follows:

type service struct {
    server interface{} // the server for service methods
    md     map[string]*MethodDesc
    sd     map[string]*StreamDesc
    mdata  interface{}}Copy the code

The server interface stores the service methods provided by the server. The following two maps store the service information of the Method and stream streams.

type MethodDesc struct {
    MethodName string
    Handler    methodHandler
}

type StreamDesc struct {
    StreamName string
    Handler    StreamHandler

    // At least one of these is true.
    ServerStreams bool
    ClientStreams bool
}
Copy the code

Each of these structs has a handler inside to handle the called method.

Going back to comment 3 of the main flow, you can see that there are two interceptors, the first interceptor is basically a chain of interceptors that we define on the server end :(see comment for details)

func chainUnaryServerInterceptors(s *Server) {
	// Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will
	// be executed before any other chained interceptors.
    // This step is mainly to check the number of interceptors, if the method unary interceptor array is not empty, we need to continue to add these interceptors to our chain
	interceptors := s.opts.chainUnaryInts // 
	ifs.opts.unaryInt ! =nil {
		interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
	}

	var chainedInt UnaryServerInterceptor
	if len(interceptors) == 0 {
		chainedInt = nil
	} else if len(interceptors) == 1 {
		chainedInt = interceptors[0]}else {
		chainedInt = func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
            // If the number of interceptors is greater than 1, a chain of interceptors is generated recursively
			return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
		}
	}

	s.opts.unaryInt = chainedInt
}
Copy the code

The logic of the getChainUnaryHandler method needs to be seen:

func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
	if curr == len(interceptors)- 1 {
		return finalHandler
	}

	return func(ctx context.Context, req interface{}) (interface{}, error) {
		return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
	}
}
Copy the code

In this logic, we first determine whether the current curr pointer is the end of the interceptor chain. If so, we return the last handler. Otherwise, we recurse to generate a chain of interceptors.

ChainStreamServerInterceptors and chainUnaryServerInterceptors methods are similar, go here.

The server is registered

Back to the main line, after completing the appropriate server Settings, register the server, follow

The pb.RegisterGreeterServer(s, &server{}) method goes all the way to the RegisterService method:

func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
	ht := reflect.TypeOf(sd.HandlerType).Elem()
	st := reflect.TypeOf(ss)
	if! st.Implements(ht) { grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
	}
	s.register(sd, ss)
}
Copy the code

The first step is to use reflection to get the type of each handler in the handler chain of the server, and then to get the type of the service defined by ourselves, and then to determine whether the user-defined service type implements the type of the handler in the server. If so, enter register logic.

func (s *Server) register(sd *ServiceDesc, ss interface{}) {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.printf("RegisterService(%q)", sd.ServiceName)
	if s.serve {
		grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
	}
	if _, ok := s.m[sd.ServiceName]; ok {
		grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
	}
	srv := &service{
		server: ss,
		md:     make(map[string]*MethodDesc),
		sd:     make(map[string]*StreamDesc),
		mdata:  sd.Metadata,
	}
	for i := range sd.Methods {
		d := &sd.Methods[i]
		srv.md[d.MethodName] = d
	}
	for i := range sd.Streams {
		d := &sd.Streams[i]
		srv.sd[d.StreamName] = d
	}
	s.m[sd.ServiceName] = srv
}
Copy the code

If the method is not registered with the server, then the method is injected into the server’s service map according to the method name key. In fact, we can predict that the server will process different RPC requests according to the different serviceName in the service map and fetch different handlers for processing.

Start the Serve

For the communication in the usual C/S architecture, the common implementation is that the server constantly sniffs whether there is a connection request on the port. If there is a connection with the client, the connection is established by shaking hands. Then the client calls the server service by calling the corresponding methods and parameters. This request is processed by a handler that goes to the server. So, on the server side, it is important to understand how it implements listening, allocates different handlers to requests and writes back response data. Take a look at the specific Serve method

func (s *Server) Serve(lis net.Listener) error {
	s.mu.Lock()
	s.printf("serving")
	s.serve = true
	if s.lis == nil {
		// Serve called after Stop or GracefulStop.
		s.mu.Unlock()
		lis.Close()
		return ErrServerStopped
	}

	s.serveWG.Add(1)
	defer func(a) {
		s.serveWG.Done()
		if s.quit.HasFired() {
			// Stop or GracefulStop called; block until done and return nil.
			<-s.done.Done()
		}
	}()

	ls := &listenSocket{Listener: lis}
	s.lis[ls] = true

	if channelz.IsOn() {
		ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
	}
	s.mu.Unlock()

	defer func(a) {
		s.mu.Lock()
		ifs.lis ! =nil && s.lis[ls] {
			ls.Close()
			delete(s.lis, ls)
		}
		s.mu.Unlock()
	}()

	var tempDelay time.Duration // how long to sleep on accept failure

	for {
		rawConn, err := lis.Accept() / / 4
		iferr ! =nil {
			if ne, ok := err.(interface {
				Temporary() bool
			}); ok && ne.Temporary() {
				if tempDelay == 0 {
					tempDelay = 5 * time.Millisecond
				} else {
					tempDelay *= 2
				}
				if max := 1 * time.Second; tempDelay > max {
					tempDelay = max
				}
				s.mu.Lock()
				s.printf("Accept error: %v; retrying in %v", err, tempDelay)
				s.mu.Unlock()
				timer := time.NewTimer(tempDelay)
				select {
				case <-timer.C:
				case <-s.quit.Done():
					timer.Stop()
					return nil
				}
				continue
			}
			s.mu.Lock()
			s.printf("done serving; Accept = %v", err)
			s.mu.Unlock()

			if s.quit.HasFired() {
				return nil
			}
			return err
		}
		tempDelay = 0
		// Start a new goroutine to deal with rawConn so we don't stall this Accept
		// loop goroutine.
		//
		// Make sure we account for the goroutine so GracefulStop doesn't nil out
		// s.conns before this conn can be added.
		s.serveWG.Add(1) / / 5.
		go func(a) {
			s.handleRawConn(rawConn)
			s.serveWG.Done()
		}()
	}
}
Copy the code

Starting with comment 4, we see that the program enters a loop and listens for the corresponding port. Following comment 5, we see that the program raises a goroutine to call the handleRawConn method, tracing further:

func (s *Server) handleRawConn(rawConn net.Conn) {
    // ... 
    conn, authInfo, err := s.useTransportAuthenticator(rawConn)
    // ...
    // Finish handshaking (HTTP2)
    st := s.newHTTP2Transport(conn, authInfo)
    if st == nil {
        return
    }
    // ...
    go func(a) {
        s.serveStreams(st)
        s.removeConn(st)
    }()
}
Copy the code

I’ve omitted some of the less important points, but you can see that in this method, the connection is actually established by establishing an HTTP2 handshake, and then the program opens a goroutine to call the serveStreams method:

func (s *Server) serveStreams(st transport.ServerTransport) {
    defer st.Close()
    var wg sync.WaitGroup
    st.HandleStreams(func(stream *transport.Stream) {
        wg.Add(1)
        go func(a) {
            defer wg.Done()
            s.handleStream(st, stream, s.traceInfo(st, stream))
        }()
    }, func(ctx context.Context, method string) context.Context {
        if! EnableTracing {return ctx
        }
        tr := trace.New("grpc.Recv."+methodFamily(method), method)
        return trace.NewContext(ctx, tr)
    })
    wg.Wait()
}
Copy the code

Here the handleStreams method is still called:

func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
    sm := stream.Method()
    ...
    service := sm[:pos]
    method := sm[pos+1:]
    srv, knownService := s.m[service]
    if knownService {
        if md, ok := srv.md[method]; ok {
            s.processUnaryRPC(t, stream, srv, md, trInfo)
            return
        }
        if sd, ok := srv.sd[method]; ok {
            s.processStreamingRPC(t, stream, srv, sd, trInfo)
            return}}... }Copy the code

Here, sure enough, the program uses the serviceName to fetch a handler from the server’s service map (m). Our hello World demo request does not refer to a stream, so we simply fetch the handler and pass it to the processUnaryRPC method for processing.

So let’s follow up with the processUnaryRPC method:

func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
    // ...
    sh := s.opts.statsHandler
    ifsh ! =nil {
        beginTime := time.Now()
        begin := &stats.Begin{
            BeginTime: beginTime,
        }
        sh.HandleRPC(stream.Context(), begin)
        defer func(a) {
            end := &stats.End{
                BeginTime: beginTime,
                EndTime:   time.Now(),
            }
            iferr ! =nil&& err ! = io.EOF { end.Error = toRPCErr(err) } sh.HandleRPC(stream.Context(), end) }() }// ...
    iferr := s.sendResponse(t, stream, reply, cp, opts, comp); err ! =nil {
        if err == io.EOF {
            // The entire stream is done (for unary RPC only).
            return err
        }
        if s, ok := status.FromError(err); ok {
            ife := t.WriteStatus(stream, s); e ! =nil {
                grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
            }
        } else {
            switch st := err.(type) {
            case transport.ConnectionError:
                // Nothing to do here.
            default:
                panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
            }
        }
        ifbinlog ! =nil {
            h, _ := stream.Header()
            binlog.Log(&binarylog.ServerHeader{
                Header: h,
            })
            binlog.Log(&binarylog.ServerTrailer{
                Trailer: stream.Trailer(),
                Err:     appErr,
            })
        }
        return err
    }
    // ...
}
Copy the code

We found calls to handler methods and writes back to Response.

So gRPC server part of the source code to the end here, the other stream way to the later article to read.