The greeter_server example is based on v1.37.0.

As with grPC-Go server source analysis, let’s first look at a sample code,

const (
	address     = "localhost:50051"
	defaultName = "world"
)

func main(a) {
	// Set up a connection to the server.
	conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
	iferr ! =nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	c := pb.NewGreeterClient(conn)

	// Contact the server and print out its response.
	name := defaultName
	if len(os.Args) > 1 {
		name = os.Args[1]
	}
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})
	iferr ! =nil {
		log.Fatalf("could not greet: %v", err)
	}
	log.Printf("Greeting: %s", r.GetMessage())
}
Copy the code

Dial calls grpc.Dial to generate a Grpc. ClientConn object, initialized in the grpc.DialContext method.

DialContext first initializes the empty object ClientConn and then determines opTS… Whether DialOption data exists, and if so, executes the function passed in and sets specific properties.

// Dial creates a client connection to the given target.
func Dial(target string, opts ... DialOption) (*ClientConn, error) {
	return DialContext(context.Background(), target, opts...)
}
Copy the code

The key point of the code is to create the ClientConn object, corresponding to the fields included in the structure.

// ClientConn represents a virtual connection to a conceptual endpoint, to
// perform RPCs. 
// 
// ClientConn is a virtual connection for RPC communication
// A ClientConn is free to have zero or more actual connections to the endpoint
// based on configuration, load, etc. It is also free to determine which actual
// endpoints to use and may change it every RPC, permitting client-side load
// balancing.
//
// ClientConn can select the number of connected terminals and load balancing logic
//
// A ClientConn encapsulates a range of functionality including name
// resolution, TCP connection establishment (with retries and backoff) and TLS
// handshakes. It also handles errors on established connections by
// re-resolving the name and reconnecting.
// ClientConn encapsulates a number of functions, including name resolution, TCP connection establishment (including retry and retreat policies), TLS, rename resolution, and reconnection mechanisms.

type ClientConn struct {
	ctx    context.Context
	cancel context.CancelFunc

	target       string
	parsedTarget resolver.Target // Load balancing is selected
	authority    string
	dopts        dialOptions // Initializes settable options that are taken with each request, see the Combine method in Call. go
	csMgr        *connectivityStateManager // Maintain connection status

	balancerBuildOpts balancer.BuildOptions / / ignore
	blockingpicker    *pickerWrapper // Load balancing Settings

	safeConfigSelector iresolver.SafeConfigSelector / / ignore

	mu              sync.RWMutex
	resolverWrapper *ccResolverWrapper // Implement resolver.ClientConn in./resolver/resolver.go, the upper wrapper for ClientConn.
	sc              *ServiceConfig
	conns           map[*addrConn]struct{} // Where the connection is stored
	// Keepalive parameter can be updated if a GoAway is received.
	mkp             keepalive.ClientParameters
	curBalancerName string
	balancerWrapper *ccBalancerWrapper // Wrappers on load balancers
	retryThrottler  atomic.Value

	firstResolveEvent *grpcsync.Event

	channelzID int64 // channelz unique identification number
	czData     *channelzData

	lceMu               sync.Mutex // protects lastConnectionError
	lastConnectionError error
}

Copy the code

Pb. NewGreeterClient(conn) returns the current PB Client object.

func NewGreeterClient(cc grpc.ClientConnInterface) GreeterClient {
	return &greeterClient{cc}
}

// ClientConnInterface defines the functions clients need to perform unary and
// streaming RPCs. It is implemented by *ClientConn, and is only intended to
// be referenced by generated code.

// ClientConnInterface defines functions that need to be implemented by objects that execute RPC methods, including unary and streaming
// ClientConn implements this interface{} and only wants to be called by automatically generated code.

type ClientConnInterface interface {
	// Invoke performs a unary RPC and returns after the response is received
	// into reply.
	// Invoke executes a request of type Unary and returns data
	Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ... CallOption) error// NewStream begins a streaming RPC.
	// NewStream enables streaming RPC.
	NewStream(ctx context.Context, desc *StreamDesc, method string, opts ... CallOption) (ClientStream, error) }Copy the code

Although ClientConn implements the ClientConnInterface, the implementation code is not put together.

ClientConn and ClientConnInterface are defined in the clientconn.go file. ClientConn implements the Invoke method in a call.go file. ClientConn implements the NewStream method in the stream.go file.

SayHello(CTX, &pb.HelloRequest{Name: Name}); use GreeterClient to call SayHello.

func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ... grpc.CallOption) (*HelloReply, error) {
	out := new(HelloReply)
	// c is the currently initialized greetClient, cc is the previously initialized ClientConn, Invoke means to use the unary method, and then jump to the call.go file.
	err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
	iferr ! =nil {
		return nil, err
	}
	return out, nil
}
Copy the code

Next, look at the Invoke method implementation,

func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ... CallOption) error {
        // Omit some code
	return invoke(ctx, method, args, reply, cc, opts...)
}

func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ... CallOption) error {
        // Create ClientStream. NewClientStream is also called by the unary method, using the second parameter StreamDesc.
	cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
	iferr ! =nil {
		return err
	}
	// To send a message, cs is the GRPC. ClientStream object and calls clientStream's SendMsg method
	iferr := cs.SendMsg(req); err ! =nil {
		return err
	}
	// To receive a message, cs is the GRPC. ClientStream object and calls the RecvMsg method of clientStream
	return cs.RecvMsg(reply)
}

Copy the code

For a call to newClientStream,

func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ... CallOption) (_ ClientStream, err error) {
	var newStream = func(ctx context.Context, done func(a)) (iresolver.ClientStream, error) {
		return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
	}
	return newStream(ctx, func(a){})}// Initialize the stream pass argument.
// StreamDesc *StreamDesc, to call unary or stream
// cc *ClientConn, GRPC connection object
// opts ... CallOption initializes the various arguments passed to the object
func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(a).opts.CallOption) (_ iresolver.ClientStream, err error) {
	// Omit some code
	cs := &clientStream{
		callHdr:      callHdr,
		ctx:          ctx,
		methodConfig: &mc,
		opts:         opts,
		callInfo:     c,
		cc:           cc,
		desc:         desc,
		codec:        c.codec,
		cp:           cp,
		comp:         comp,
		cancel:       cancel,
		beginTime:    beginTime,
		firstAttempt: true,
		onCommit:     onCommit,
	}
	op := func(a *csAttempt) error { return a.newStream() }
	// Use forloop to initialize the stream
	if err := cs.withRetry(op, func(a) { cs.bufferForRetryLocked(0, op) }); err ! =nil {
		cs.finish(err)
		return nil, err
	}
	return cs, nil
}
Copy the code

Look at the withRetry implementation,

func (cs *clientStream) withRetry(op func(a *csAttempt) error.onSuccess func(a)) error {
	cs.mu.Lock()
	for {
		if cs.committed {
			cs.mu.Unlock()
			return op(cs.attempt)
		}
		a := cs.attempt // Here is the pointer
		cs.mu.Unlock()
		err := op(a) // pass a pointer to the op function. Op is' op := func(a *csAttempt) error {return a.newstream ()} '
		cs.mu.Lock()
		ifa ! = cs.attempt {// We started another attempt already.
			continue
		}
		if err == io.EOF {
			<-a.s.Done()
		}
		if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) {
			onSuccess()
			cs.mu.Unlock()
			return err
		}
		iferr := cs.retryLocked(err); err ! =nil {
			cs.mu.Unlock()
			return err
		}
	}
}

func (a *csAttempt) newStream(a) error {
	cs := a.cs
	cs.callHdr.PreviousAttempts = cs.numRetries
	s, err := a.t.NewStream(cs.ctx, cs.callHdr)
	iferr ! =nil {
		if _, ok := err.(transport.PerformedIOError); ok {
			// Return without converting to an RPC error so retry code can
			// inspect.
			return err
		}
		return toRPCErr(err)
	}
	cs.attempt.s = s // Set stream here, going all the way around
	cs.attempt.p = &parser{r: s}
	return nil
}

Copy the code

Now that we know the stream initialization logic in the Invoke method from the code analysis above, let’s look at the SendMsg and RecvMsg methods.

First, both the SendMsg and RecvMsg methods belong to the ClientStream interface, and there are other methods in this interface that are mainly used for HTTP2 communication.

// ClientStream defines the client-side behavior of a streaming RPC.
// ClientStream defines the client behavior for RPC communication.
// 
// All errors returned from ClientStream methods are compatible with the
// status package.
// All errors returned by the ClientStream method apply to those defined in the status package.

type ClientStream interface {
	// Header returns the header metadata received from the server if there
	// is any. It blocks if the metadata is not ready to read.
	// Header returns Header metadata on the server, and blocks if the metadata is not met.
	Header() (metadata.MD, error)
	// Trailer returns the trailer metadata from the server, if there is any.
	// It must only be called after stream.CloseAndRecv has returned, or
	// stream.Recv has returned a non-nil error (including io.EOF).
	// Trailer returns Trailer metadata, which is only called when stream.CloseAndRecv returns or when stream.Recv returns an error.
	Trailer() metadata.MD
	// CloseSend closes the send direction of the stream. It closes the stream
	// when non-nil error is met. It is also not safe to call CloseSend
	// concurrently with SendMsg.
	// CloseSend is used to close the sender's stream, or if a non-null error is encountered. CloseSend and SendMsg are not concurrency safe.
	CloseSend() error
	// Context returns the context for this stream.
	//
	// It should not be called until after Header or RecvMsg has returned. Once
	// called, subsequent client-side retries are disabled.
	// Context returns the stream Context and should not be called after Header or RecvMsg is returned.
	// Once clicked, subsequent client retries are invalid.
	Context() context.Context
	// SendMsg is generally called by generated code. On error, SendMsg aborts
	// the stream. If the error was generated by the client, the status is
	// returned directly; otherwise, io.EOF is returned and the status of
	// the stream may be discovered using RecvMsg.
	SendMsg is usually called by automatically generated code. SendMsg will stop the stream if an error is encountered.
	// If the error was caused by the client, the stream status is immediately returned, otherwise IO.EOF is returned.
	//
	// SendMsg blocks until:
	// - There is sufficient flow control to schedule m with the transport, or
	// - The stream is done, or
	// - The stream breaks.
	// SendMsg will block in three scenarios, the stream is terminated, the stream is complete, and there is sufficient flow control
	// SendMsg does not wait until the message is received by the server. An
	// untimely stream closure may result in lost messages. To ensure delivery,
	// users should ensure the RPC completed successfully using RecvMsg.
	SendMsg will not block until the server receives complete data. Premature stream closure will result in message loss.
	// To ensure acceptance, the user should use RecvMsg to ensure successful RPC termination.
	
	// It is safe to have a goroutine calling SendMsg and another goroutine
	// calling RecvMsg on the same stream at the same time, but it is not safe
	// to call SendMsg on the same stream in different goroutines. It is also
	// not safe to call CloseSend concurrently with SendMsg.
	It is ok to call SendMsg and RecvMsg concurrently in different coroutines, but it is not concurrency safe to call SendMsg simultaneously in different coroutines.
	// CloseSend and SendMsg are also not concurrency safe.
	SendMsg(m interface{}) error
	// RecvMsg blocks until it receives a message into m or the stream is
	// done. It returns io.EOF when the stream completes successfully. On
	// any other error, the stream is aborted and the error contains the RPC
	// status.
	RecvMsg blocks until all data is received or the stream stops. If the stream completes successfully, io.eof is returned.
	// For other errors, the stream is terminated and the error has RPC status.
	// 
	// It is safe to have a goroutine calling SendMsg and another goroutine
	// calling RecvMsg on the same stream at the same time, but it is not
	// safe to call RecvMsg on the same stream in different goroutines.
	// SendMsg and RecvMsg calls are concurrency safe, RecvMsg calls from different coroutines are not concurrency safe.
	RecvMsg(m interface{}) error
}

Copy the code

Analyzing the SendMsg method called by the Invoke method,

func (cs *clientStream) SendMsg(m interface{}) (err error) {
	// Omit some code
	
	// process the message
	hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
	iferr ! =nil {
		return err
	}

	msgBytes := data // Store the pointer before setting to nil. For binary logging.
	op := func(a *csAttempt) error {
	    // Send a message
		err := a.sendMsg(m, hdr, payload, data)
		// nil out the message and uncomp when replaying; they are only needed for
		// stats which is disabled for subsequent attempts.
		m, data = nil.nil
		return err
	}
	// Retry to ensure that the message is sent
	err = cs.withRetry(op, func(a) { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
    // Omit some code
}
Copy the code

(a * csAttempt) sendMsg Write belongs to the type of message ClientTransport interface {} in the method, the internal/transport/transport. Go file.

func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
	cs := a.cs
	iferr := a.t.Write(a.s, hdr, payld, &transport.Options{Last: ! cs.desc.ClientStreams}); err ! =nil {
        // ...
	}
	// ...
	return nil
}
Copy the code
// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
// should proceed only if Write returns nil.
// the Writ method makes data into data frames and then sends them out.
func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
	if opts.Last {
		// If it's the last message, update stream state.
		if! s.compareAndSwapState(streamActive, streamWriteDone) {return errStreamDone
		}
	} else ifs.getState() ! = streamActive {return errStreamDone
	}
	df := &dataFrame{
		streamID:  s.id,
		endStream: opts.Last,
		h:         hdr,
		d:         data,
	}
	ifhdr ! =nil|| data ! =nil { // If it's not an empty data frame, check quota.
		if err := s.wq.get(int32(len(hdr) + len(data))); err ! =nil {
			return err
		}
	}
	return t.controlBuf.put(df)
}

// controlBuffer is a way to pass information to loopy.
// Information is passed as specific struct types called control frames.
// A control frame not only represents data, messages or headers to be sent out
// but can also be used to instruct loopy to update its internal state.
// It shouldn't be confused with an HTTP2 frame, although some of the control frames
// like dataFrame and headerFrame do go out on wire as HTTP2 frames.
// controlBuffer is a way of passing information to loopy in the form of a special structure called control Frames. A Control frame not only represents data, messages, and headers, but also notifies Loopy to update the internal state.
// Note not to be confused with http2 frames, although some are like dataFrame, headerFrame.
type controlBuffer struct {
	ch              chan struct{}
	done            <-chan struct{}
	mu              sync.Mutex
	consumerWaiting bool
	list            *itemList
	err             error

	// transportResponseFrames counts the number of queued items that represent
	// the response of an action initiated by the peer. trfChan is created
	// when transportResponseFrames >= maxQueuedTransportResponseFrames and is
	// closed and nilled when transportResponseFrames drops below the
	// threshold. Both fields are protected by mu.
	transportResponseFrames int
	trfChan                 atomic.Value // *chan struct{}
}
Copy the code

Err = cs.withretry (op, func() {cs.bufferForretrylocked (len(HDR)+len(payload), op)})

func (cs *clientStream) withRetry(op func(a *csAttempt) error.onSuccess func(a)) error {
	for {
	    // retryLocked is very important, what are you doing?
		iferr := cs.retryLocked(err); err ! =nil {
			cs.mu.Unlock()
			return err
		}
	}
}

func (cs *clientStream) retryLocked(lastErr error) error {
	for {
	    / / see newAttemptLocked
		if err := cs.newAttemptLocked(nil.nil); err ! =nil {
			return err
		}
	}
}

func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (retErr error) {
	newAttempt := &csAttempt{
		cs:           cs,
		dc:           cs.cc.dopts.dc,
		statsHandler: sh,
		trInfo:       trInfo,
	}
	// Each time getTransport gets the connection used, cc is the ClientConn object, involved in load balancing.
	t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method)
	iferr ! =nil {
		return err
	}
	iftrInfo ! =nil {
		trInfo.firstLine.SetRemoteAddr(t.RemoteAddr())
	}
	newAttempt.t = t
	newAttempt.done = done
	cs.attempt = newAttempt
	return nil
}

func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo).error) {
    // Focus on the pick method
	t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
		Ctx:            ctx,
		FullMethodName: method,
	})
}

func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo).error) {
    // Select transport that meets the condition
		if t, ok := acw.getAddrConn().getReadyTransport(); ok {
			if channelz.IsOn() {
				return t, doneChannelzWrapper(acw, pickResult.Done), nil
			}
			return t, pickResult.Done, nil}}}func (ac *addrConn) getReadyTransport(a) (transport.ClientTransport, bool) {
    // Create a connection
	ac.connect()
	return nil.false
}

func (ac *addrConn) connect(a) error {
	// Asynchronous connection
	go ac.resetTransport()
	return nil
}

func (ac *addrConn) resetTransport(a) {
	for i := 0; ; i++ {
	    // Create a connection, return if one is created successfully
		newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
	}
}

func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {
    / / NewClientTransport created
	newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onPrefaceReceipt, onGoAway, onClose)
}

func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(a).onGoAway func(GoAwayReason).onClose func(a)) (ClientTransport, error) {
	return newHTTP2Client(connectCtx, ctx, addr, opts, onPrefaceReceipt, onGoAway, onClose)
}

// Too much detail. How to receive incoming messages
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(a).onGoAway func(GoAwayReason).onClose func(a)) (_ *http2Client, err error) {
    // Start the reader goroutine for incoming message. Each transport has
	// a dedicated goroutine which reads HTTP2 frame from network. Then it
	// dispatches the frame to the corresponding stream entity.
	go t.reader()
}

// Handle http2 data and server counterpart.
func (t *http2Client) reader(a) {
	defer close(t.readerDone)
	// Check the validity of server preface.
	frame, err := t.framer.fr.ReadFrame()
	iferr ! =nil {
		t.Close() // this kicks off resetTransport, so must be last before return
		return
	}
	t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
	if t.keepaliveEnabled {
		atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
	}
	sf, ok := frame.(*http2.SettingsFrame)
	if! ok { t.Close()// this kicks off resetTransport, so must be last before return
		return
	}
	t.onPrefaceReceipt()
	t.handleSettings(sf, true)

	// loop to keep reading incoming messages on this transport.
	for {
		t.controlBuf.throttle()
		frame, err := t.framer.fr.ReadFrame()
		if t.keepaliveEnabled {
			atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
		}
		iferr ! =nil {
			// Abort an active stream if the http2.Framer returns a
			// http2.StreamError. This can happen only if the server's response
			// is malformed http2.
			if se, ok := err.(http2.StreamError); ok {
				t.mu.Lock()
				s := t.activeStreams[se.StreamID]
				t.mu.Unlock()
				ifs ! =nil {
					// use error detail to provide better err message
					code := http2ErrConvTab[se.Code]
					errorDetail := t.framer.fr.ErrorDetail()
					var msg string
					iferrorDetail ! =nil {
						msg = errorDetail.Error()
					} else {
						msg = "received invalid frame"
					}
					t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil.false)}continue
			} else {
				// Transport error.
				t.Close()
				return}}switch frame := frame.(type) {
		case *http2.MetaHeadersFrame:
			t.operateHeaders(frame)
		case *http2.DataFrame:
			t.handleData(frame)
		case *http2.RSTStreamFrame:
			t.handleRSTStream(frame)
		case *http2.SettingsFrame:
			t.handleSettings(frame, false)
		case *http2.PingFrame:
			t.handlePing(frame)
		case *http2.GoAwayFrame:
			t.handleGoAway(frame)
		case *http2.WindowUpdateFrame:
			t.handleWindowUpdate(frame)
		default:
			if logger.V(logLevel) {
				logger.Errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
			}
		}
	}
}
Copy the code

The client HTTP2 flow chart is as follows,

Let’s look at the RecvMsg call logic.

// The withRetry method is used for retries
func (cs *clientStream) RecvMsg(m interface{}) error {
    // Omit some code and focus on the a.recvmsg (m, recvInfo) code
	err := cs.withRetry(func(a *csAttempt) error {
		return a.recvMsg(m, recvInfo)
	}, cs.commitAttemptLocked)
	return err
}
Copy the code

RecvMsg is implemented below,

func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
    // Omit some code
	err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
}

func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error {
    // accept and decompress
	d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)
}

func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) ([]byte, error) {
    / / receive
	pf, d, err := p.recvMsg(maxReceiveMessageSize)
}


// Read the full gRPC message from the stream in the form of a return message and payload. The caller manages the returned message memory.
// If there is an error, the possible error is,
// IO.EOF, when there is no message
// io.ErrUnexpectedEOF
// of type transport.ConnectionError
// Or an error defined in the status package.
func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byte, err error) {
    // Read the request header
	if_, err := p.r.Read(p.header[:]); err ! =nil {
		return 0.nil, err
	}
	// Read the message body
	msg = make([]byte.int(length))
	if_, err := p.r.Read(msg); err ! =nil {
		if err == io.EOF {
			err = io.ErrUnexpectedEOF
		}
		return 0.nil, err
	}
	return pf, msg, nil
}
Copy the code

Obviously, the grPC-Go client code is more complex than the server logic, this article only lists the simplest call logic, other issues such as load balancing, permission verification, error status, etc., have been omitted, subsequent topics.