The book continues to analyze the process of closing the Watch process on the Client side. First, let’s review the key roles and design concepts of Watch:

role instructions
watcher External interface Watcher implementation, focusing on the Watch() method
watchGrpcStream Bridges, managing internal GRPC connections, managing internal virtual streams, message reception, and distribution
watchClient The underlying GRPC connects to the client
watcherStream A virtual Stream that handles communication between the server and the client

To analyze the shutdown process, let’s first look at what conditions might cause a shutdown:

1. xx.ctx.Done()

First, all roles have an inner loop, and all roles contain a CTX context. Therefore, CTX is the universal turn-off method. Generally speaking, the usage routines of CTX are as follows:

func main(a){
		ctx, cancel = context.WithTimeout(ctx, client.cfg.DialTimeout)
		go resource.Run(ctx)
		defer func(a){
			ifcancel ! =nil{
				cancel()
			}
		}()
		for{}}func(r *Resource) Run(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			return 
		default:
			// .. do somethings }}Copy the code

This way, when CTX expires by cancel() or timeout, the running child Goruntine will exit automatically. Its internal communication is also through Channel, I will not introduce too much here. See how Context is used in the Watch process.

The internal loop of each role in the Watch process basically maintains a similar structure to the above example code, which checks ctx.done () through select to see if it is not blocked, thus executing the logic of closing the loop.

2. <-w.errc

In watchGrpcStream, there is a case Err := < -w.rc path, where err is forwarded by the serveWatchClient loop when it receives an error from the ETCD server, but does not necessarily cause the Watch process to close. Depending on the error, it can be shut down or restarted automatically.

3. No more virtual streams in watchGrpcStream

When watchGrpcStream finds that all virtual streams have been shut down, it closes itself, ending the Watch process.

4. Call Close() on the user side

When the user actively calls Close(), the entire Watch process is of course also closed. This is also done by calling the global CTX cancel() method, so the logic is the same as in point 1.

Detailed analysis of closing process

Now that we’ve found all the shutdown scenarios, let’s take a closer look at how each character is turned off:

1. The closure of Watcher

Func (w *watcher) Close() (err error) {w.mlock () // Close all watchgrpcStreams for _, wgs := range streams { if werr := wgs.close(); werr ! Canceled Canceled = nil {err = werr}} cancel() Canceled = nil {err = werr}} cancel() Canceled = nil {err = werr}} cancel() Canceled = nil Canceled {err == context.Canceled {err = nil} return err}Copy the code

Closing watcher is easy because it doesn’t have a loop of its own. All it needs to do is notify watchGrpcStream that it needs to close, and it basically doesn’t have to do anything.

2. Close the watchClient

func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) { for { resp, err := wc.Recv() if err ! = nil { select { case w.errc <- err: case <-w.donec: Return} select {case w.rec < -resp: case < -w.donc: WatchGrpcStream is donec (closed).Copy the code

The watchClient shutdown is also easy to understand. If an error is received, the loop will exit. Or find that watchGrpcStream is already done and exit yourself. Err := wc.recv () if the watchGrpcStream is stuck on the resp line, err := WC.recv () if the watchGrpcStream is done, how does it exit the loop?

WatchClient, err = w.emote. Watch(w.tx, w.allopts…) ; Resp, err := wc.recv () also returns resp = nil and err = Context.canceled as soon as the watchGrpcStream’s CTX is passed in.

3. Closure of watcherStream

func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) { ... // Close the process defer func() {if! Resuming {/ / tag into the closing process (closed) / / need watchGrpcStream can truly close watcherStream / / because the need to recycle it ws watchGrpcStream closing = true} / / // Close (ws. Donec) if (ws. Donec) if (ws. Output {// send watchGrpcStream to handle its own shutdown w.closingc < -ws} // Quantity of watcherStream -1 W.W.D. One ()}() emptyWr := &WatchResponse{} for { ... select { case outc <- *curWr: ... case wr, ok := <-ws.recvc: if ! Ok {// close(ws.recvc) // Also can let watchStream into the closing process return}... case <-w.ctx.Done(): return case <-ws.initReq.ctx.Done(): return case <-resumec: // The watchStream inner loop exits, but after the watchStream is restarted, the inner loop starts again.Copy the code

WatchStream is a watchStream that sends itself to watchGrpcStream to recycle resources. Restart is to close the internal loop, and when the restart is complete, it will restart again, the resources are unchanged.

WatchStream: watchStream: State flow

With watchRequest, we will create a watchStream, which was introduced in the previous article. The watchStream created is in the initialize state. The watchStream will be substream only after the ETCD server resP is received. If watchGrpcStream restarts the underlying GRPC connection, all substream watchstreams will be restored to the descendant state. Third, both the delta and the substream states can be closed,ctx.DoneI won’t introduce you,close(recvc)WatchGrpcStream closes itself and passesclose(recvc)Close all watchstreams inside it. 2. The ETCD server returns the flagCanceledResponse is also calledclose(recvc)Close the watchStream

WatchGrpcStream recyles the watchStream resource:

	// This code is captured from the run() method of watchGrpcStream
	case ws := <-w.closingc:
			// Analyze later
			w.closeSubstream(ws)
			When the ETCD server returns a Canceled response, it adds WS to the closing set
			// Then close its recvc, in this case removing the WS from the closing.
			delete(closing, ws)
			
			// watchGrpcStream can exit if substreams and filesize are not available
			// If watchGrpcStream exits, the ETCD server is not greeted (the underlying connection is gone).
			if len(w.substreams)+len(w.resuming) == 0 {
				return
			}
			// If watchGrpcStream itself does not exit, please be polite to the ETCD server
			// The ETCD server is not closed.
			ifws.id ! =- 1 {
				// Add to unique set to prevent multiple exit messages.
				cancelSet[ws.id] = struct{}{}
				cr := &pb.WatchRequest_CancelRequest{
					CancelRequest: &pb.WatchCancelRequest{
						WatchId: ws.id,
					},
				}
				req := &pb.WatchRequest{RequestUnion: cr}
				iferr := wc.Send(req); err ! =nil{... }}}Copy the code

There are two data structures called closing and cancelSet, both of which are designed to prevent repeated closures when WS is being closed elsewhere. These details are no longer developed. The key is the watchStream state transition.

Reclaiming watchStream resources does three things
func (w *watchGrpcStream) closeSubstream(ws *watcherStream){...// 1. Send a close message to the user side to close the Channel
	ifcloseErr := w.closeErr; closeErr ! =nil && ws.initReq.ctx.Err() == nil {
		go w.sendCloseSubstream(ws, &WatchResponse{Canceled: true, closeErr: w.closeErr})
	} else ifws.outc ! =nil {
		close(ws.outc)
	}
	// 2. Remove ws from substreams
	ifws.id ! =- 1 {
		delete(w.substreams, ws.id)
		return
	}
	// 3. Remove ws from eq
	for i := range w.resuming {
		if w.resuming[i] == ws {
			w.resuming[i] = nil
			return}}}Copy the code

3. Close watchGrpcStream

And finally, watchGrpcStream, which has the most complicated shutdown process. Because it also involves an automatic restart. Let’s first analyze the automatic restart process:

	case err := <-w.errc:
	// Automatic restart is only possible if an error is received from the ETCD server
			if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader {
			    // Cannot restart, serious error occurred
			    // Serious error generally refers to:
			    // 1. It is not an Unavailable error
			    // 2. This is not an Internal error
				closeErr = err
				return
			}
			// Reconnect a WatchClient, the original one has exited due to err
			ifwc, closeErr = w.newWatchClient(); closeErr ! =nil {
				return
			}
			// Since all ws becomes a scalar, find the ws at the head of the team and send out its request
			// Restart the ws state flow
			ifws := w.nextResume(); ws ! =nil {
				iferr := wc.Send(ws.initReq.toPB()); err ! =nil {
					w.lg.Debug("error when sending request", zap.Error(err))
				}
			}
			// cancelSet is reset because all ws becomes a scalar
			cancelSet = make(map[int64]struct{})
Copy the code

Let’s look at the internal logic of newWatchClient:

func (w *watchGrpcStream) newWatchClient(a) (pb.Watch_WatchClient, error) {
	// Remember when we analyzed the watchStream inner loop
	// Resumec? You can go back and look at it
	close(w.resumec)
	w.resumec = make(chan struct{})
	// All WS internal loops exit after resumec is reset
	// Not close, but wait for the loop to exit
	w.joinSubstreams()
	
	// change ws from substrems to scalar
	for _, ws := range w.substreams {
		ws.id = - 1
		w.resuming = append(w.resuming, ws)
	}
	
	...

	// There is a ws that cannot be converted to a scalar state
	// The internal CTX expires (ctx.done)
	// This is where the ws is removed
	stopc := make(chan struct{})
	donec := w.waitCancelSubstreams(stopc)
	wc, err := w.openWatchClient()
	close(stopc)
	<-donec

	// Now we can start the loop again for the rest of the ws
	// The restart is complete
	for _, ws := range w.resuming {
		if ws.closing {
			continue
		}
		ws.donec = make(chan struct{})
		w.wg.Add(1)
		go w.serveSubstream(ws, w.resumec)
	}

	iferr ! =nil {
		return nil, v3rpc.Error(err)
	}

	// Re-listen on the underlying GRPC connection
	go w.serveWatchClient(wc)
	return wc, nil
}
Copy the code

Careful readers may notice that newWatchClient() does so much for a restart that it is not affected when it first runs. If you look closely, the first time it was run, before WS, a lot of the code was running on empty with no real logic.

Before we look at the watchGrpcStream shutdown process, we can pause to think about what we would do if we implemented the shutdown process based on all the information in this article.

I think it should be divided into the following steps:

  1. If there are errors, collect them and send them to the user via the watchStream channel
  2. Close WS inside the substream
  3. Shut down WS from within the CLAN
  4. Close the context so that objects that depend on it can be closed

Ok, let’s look at the actual core code for watchGrpcStream:

defer func(a) {
		// Collect errors, which are carried to the user side when closeSubstream is used.
		w.closeErr = closeErr
		// Close ws within SubStreams
		for _, ws := range w.substreams {
			if_, ok := closing[ws]; ! ok {close(ws.recvc)
				closing[ws] = struct{} {}}}// Close ws inside the scalar
		for _, ws := range w.resuming {
			if_, ok := closing[ws]; ws ! =nil && !ok {
				close(ws.recvc)
				closing[ws] = struct{} {}}}// Wait for the WS to shut down successfully
		w.joinSubstreams()
		// Recycle the resources of ws
		for range closing {
			w.closeSubstream(<-w.closingc)
		}
		w.wg.Wait()
		// Close yourself
		w.owner.closeStream(w)
	}()
Copy the code

The core closing process is in defer, inside closeStream:

func (w *watcher) closeStream(wgs *watchGrpcStream) {
	w.mu.Lock()
	// Send donEC signal to tell yourself to shut down
	close(wgs.donec)
	/ / CTX shut down
	wgs.cancel()
	// Remove yourself from Watcher streams
	ifw.streams ! =nil {
		delete(w.streams, wgs.ctxKey)
	}
	w.mu.Unlock()
}
Copy the code

When comparing the actual shutdown process with the shutdown process I analyzed, I omitted that watchGrpcStream is cached for sharing and should be removed from the cache if it is shut down.

Watch Process Summary

OK, this is the end of the Watch process analysis. Before the end, let’s review the whole process and see what the essential design ideas can be used for us:

  1. For those who look at the Watch flow code for the first time, they are bound to be confused (as I was). I don’t know why there are so many roles. The data is forwarded to and fro within the company. After analysis, you should have a clear understanding. The second is to break down complex logic in order to clear division of labor. And we are in the process of analysis, also seized the right idea, that is to dismantle. Whether it is the breakdown analysis of watcher, watchGrpcStream, watchStream, etc., or the breakdown analysis of the start, listen, close, restart process. During analysis, focus only on information that is of interest to the analysis topic and ignore irrelevant code. This idea is feasible for source code analysis in any scenario.
  2. CTX ETCD code is the use of CTX to the essence, close without it, communication without it. In this sense, CTX is overkill if it is only used to pass context variables.
  3. Donec /resumc/closingc: Status \ isClosing int This representation is certainly possible, but in a multithreaded environment, concurrency problems are likely. We are looking at the code of ETCD. The way it represents state makes heavy use of Channel. This Channel does not send any data, it is just a signal, close(DONec) will be fired, thus indicating the state flow. And it’s concurrency safe.

There are other summaries that readers can post to the comments section for discussion.