Version: Start with version 3.4.14 and compare with version 3.5.5

Client Management

The ZK server interacts with the client over a TCP long connection. The ZK server can enable the TCP Server in either of the following ways:

  1. Based on Netty implementationNIOServerCnxnFactory
  2. Implemented based on Java native NIONIOServerCnxnFactory

Note that ServerCnxnFactory is only used for client connection management and has no relation to the interaction between other servers. The default mode is used online, that is, NIOServerCnxnFactory mode, so next mainly to analyze NIOServerCnxnFactory mode prevail

Before we talk about connection management, let’s look at thread management in NIOServerCnxnFactory: which threads are used to process connection requests, which requests are used to process read and write requests, how many Selector threads are opened, and so on. Since the exact number of threads is related to the number of CPU cores by default, we assume the number of CPU cores is 8:

  1. Connection request: corresponding1Two threads, i.eAcceptThread, the thread will hold a separateSelector. If you receiveConnection request, will correspond to the requestSocketChannelPushed to theBlocking queues in SelectorThreadAnd then the logic goesSelectorThreadTo deal with
  2. SelectorThread: The number of threads issqrt(numCores/2)So there are two of themSelectorThreadThreads, one for eachSelector. This thread has two functions, one is processingAcceptThreadPushed inConnection requestAnd encapsulate it intoNIOServerCnxnObject maintained in the Map cache; Another function is receptionRead and write requests, and then encapsulates the read and write request intoIOWorkRequestObject, and finally submitted toioWorkersThread pool, which just receives THE I/O requests and quickly submits them toioWorkersThread pool, the specific business logic byioWorkersThread pool processing, so this process is also very fast
  3. ioWorkers: The number of threads isnumCores*2, so the number of threads is 16. This thread pool is used for processingSelectorThreadA thread submits multiple IO tasks, i.e. specific business logic. The default thread pool type isExecutors.newFixedThreadPool.NIOServerCnxnthroughSelectionKey#attachmentpass
  4. ConnectionExpirerThread: Number of threads 1, corresponding to the figureClearThread, which is used to clean up connections that have expiredExpiryQueueData, there is an introduction to the data structure, referenceSession managementchapter

NIOServerCnxnFactory: NIOServerCnxnFactory: NIOServerCnxnFactory: NIOServerCnxnFactory: NIOServerCnxnFactory: NIOServerCnxnFactory As mentioned briefly, it is encapsulated as a NIOServerCnxn object

  1. NIOServerCnxn: can be thought of as an abstraction of the client connection. Some of the client request processing that will be covered later is related to this object
  2. ipMapTo:The client IP address is key and NIOServerCnxns is valueThe hash table. A ZK client can establish multiple long connections with the server. The server limits the number of connections to each ZK client. The default value is yes60
  3. cnxns:NIOServerCnxnSet Set, used primarily for4 word order

Session management

  1. This parameter is involved when the client makes the first I/O requestNIOServerCnxnInitialization of an object, that is, executionNIOServerCnxn#readConnectRequestmethods
  2. Because this is the first IO request, the client and server only establish a TCP long connectionSession, so one will be created for the client in this methodSessionImplObjects, that is,ZooKeeperServer#createSessionmethods
  3. Continue to performSessionTracker#createSessionMethod that will be createdSessionObject maintenance toSessionTrackerImplIn the cache, and return asessionIdAnd then the client will bring it with it every time it requests itsessionId. In clustered mode, you also need to synchronize this Session to other server nodes, which is more complicated, but we’ll look at that later
  4. Will create thesessionIdMaintenance toNIOServerCnxnIn the object
  5. NIOServerCnxnThe object is initialized and then an IO request is receivedNIOServerCnxn#readRequestMethod, and then callZooKeeperServer#processPacket => ZooKeeperServer#submitRequestMethods. This method is called firstZooKeeperServer#touchMethods,SessionTrackerImplUpdate the active time of the session in. An error is reported if the session has expired. Then hand over the requestZooKeeperServerBusiness logic chain for processing

How are Sessions maintained in SessionTrackerImpl? How does it know which Session expires and which Session doesn’t expire?

  1. sessionsById: Data structure isKey is the sessionId and value is the SessionImplMap
  2. sessionExpiryQueue:ExpiryQueue<SessionImpl>. There are aboutExpiryQueueAs shown in the figure below,SessionTrackerImplThat corresponds to the figure belowClearThreadIn itrunIt’s constantly cleaning up the methodsessionExpiryQueueThe expired element in

  1. ExpiryQueueThe underlying storage structure isKey Indicates the time stamp. Value is SetNote that the timestamp here can betickTimeDivisible, at least one key is separated from each othertickTime
  2. The length of each cell is zerotickTime.tickTimeThe default is 2000 in ZK
  3. Each time a client request (including a heartbeat) comes in, the server needs to update itActive time of the Session corresponding to the clientUpdate the Session inExpiryQueueIn the grid position, which will triggerExpiryQueue#updatemethods
  4. If the client is configuredsessionTimeOutIs 10, the current time is 1618647260000, in executionExpiryQueue#updateMethod of time, first to((now + sessionTimeOut)/tickTime + 1) * tickTimeCalculates the grid position given to the session, i.e(1618647260000+10000)/2000 +1)/2000 = 16186472612000, the formula can ensure that the calculated results can betickTimealiquot
  5. According to the formula in Step 4, if this is a newly created Session, then the Session will be placedThe 16186472612000 gridIn the corresponding Set; If this is one of theExpiryQueueIf an existing Session exists in, the Session is moved toThe 16186472612000 gridAnd then delete it from the old Set
  6. I only mentioned Session updates above, but I didn’t take into account a few other things. If the client is disconnected, the Sessioin will remain in the Map and will not be cleared, resulting in a memory leak and possibly an OOM on the Server. In addition, ZKsessionTimeOutIs a very important concept, such as the client insessionTimeOutAnd then it reconnects, and that Session is actually not available. That means there also needs to be a mechanism for clearing expired sessions
  7. The cleanup logic is actually very simple. Open an independent thread, always calculate the grid with the current time, and then clear the Set corresponding to the grid. It is clear from the above that if a Session is valid, it will continue to move backward, and the Set corresponding to the current time cell represents the Sessions that can be cleared

Business logic chain

Above, we just introduced the client connection management. We haven’t seen the business request process submitted by the client, such as creating a node, but we can probably guess that the entry is in ioWorker. The processing logic on the server side is actually organized by the RequestProcessor in the form of responsibility chain, such as writing transaction log, updating DataTree cache, etc. In ZK, there are different Requestprocessors. The responsibility chain varies according to zkServer roles.

For example, in single-machine mode, in the ZooKeeperServer#startup method, the RequestProcessor link initialization logic is executed as follows:

RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
((SyncRequestProcessor)syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor)firstProcessor).start();
Copy the code

The following figure shows the corresponding zkServer in different statesRequestProcessors

Data is stored

  1. There are data stores for ZK, bothZKDatabaseRelated, which can be regarded as the facade class of ZK operation data, including the following types of ZK data:DataTree,FileTxnSnapLog,minCommittedLog,maxCommittedLog,committedLogEtc.
  2. DataTree: ZK node representation in memory, includingNode path and node Map mapping,dataWatches,childWatches,ephemeralsEtc.
  3. Snapshot file: the snapshot file corresponds to the full amount of ZK data up to a certain point in time. The file name isThe snapshot.} {the transaction ID, includingThe transaction ID on the file name represents the largest transaction ID in the file. Each time50000 ~ 100000After the transaction, a snapshot file is created. It can be cleaned up according to ZK’s file cleanup policy
  4. Transaction log file: indicates the pre-write log file of ZK. The file format isThe log.} {the transaction ID, includingThe transaction ID on the file name represents the smallest transaction ID in the file
  5. After each transaction is committed, the data is committed toDataTreeAll I’s in the cache
  6. Before each ZKServer startup, snapshot logs and transaction logs are loaded toDataTree. The loading sequence of snapshot files is as follows: Sort all snapshot files in ascending order by transaction ID, extract the first 100 snapshots, and load them accordingly. If the first file is successfully loaded, the file is returned; otherwise, the second file is loaded, and so on. After the snapshot file is loaded, you can get the latest transaction ID of the current stage. Then, according to the transaction ID, you can search for the transaction log file that needs to be loaded. If the transaction log file is found, the transaction log will be played back one by one. Refer to the details involved in finding a transaction file by transaction IDChapter on Document Cleaning

Watch mechanism

  1. Watch corresponds to two parts:Watch the registrationandWatch the trigger. This can be done through several fixed apis provided by ZKWatch the registration, the registered watch will be saved on the client side and marked on the server side;Watch the triggerWhen a node changes, the server sends the node change event to the client
  2. In native ZK, the following apis are provided for us to registerWatch, respectively,Zookeeper constructor,Zookeeper#exists,Zookeeper#getData,Zookeeper#getChildren,Zookeeper#removeWatches. We usually use a variety of zkClientsubscribexxAnd so on, are all packaged
  3. Node change events are as follows:Creating a Node,Deleting a Node,Change node events,Event that the child node list changed.Watcher is not triggered by changes to the parent or grandchildren, but by changes to the watcher itself and its children

Watch the registration

Here we take Zookeeper#getData as an example, and the corresponding target node is /zk/test, to explain the whole process of watch registration

  1. performZookeeper#getDataMethod, let’s say we pass in a Watcher object that we wrote ourselves
  2. Encapsulate the Watcher object asDataWatchRegistrationObject that encapsulates the request body asGetDataRequestObject, and then pushes the message toIn the blocking queue of the asynchronous sending request thread
  3. The client thread blocks until the request returns
  4. The client executes the command after receiving the resultDataWatchRegistration#registerMethod, which is mainly used to queryZookeeper.ZKWatchManager.dataWatchesAdd the Watcher we passed to the cache

So what does the server do when it receives a Watch registration request?

  1. The server receives the client request, which eventually comes inFinalRequestProcessor#processRequestMethod, and then executeZKDatabase#getData => DataTree#getDatamethods
  2. Get the node data from the DataTree, and then set theRepresents the NIOServerCnxn to which the current customer is connectedAdd to as a WatcherDataTree.dataWatchesIn the cache, NIOServerCnxn implements the Watcher interface
  3. We can take a look firstNIOServerCnxn#processThe logic of the method is to create aWatcherEventObject and send it to the client

Watch the trigger

Next, we take Zookeeper#setData as an example, and the corresponding target node is /zk/test, to explain the whole process triggered by watch

  1. The following uses node creation as an exampleFinalRequestProcessor#processRequestMethod, and then executeZooKeeperServer#processTxn => ZKDatabase#processTxn => DataTree#processTxnmethods
  2. performDataTree#setDataMethod, do some pre-validation, and then assign a value to the node
  3. The triggerdataWatches.triggerWatchMethod, which triggers watch, corresponds toWatchManager#triggerWatchmethods
  4. inWatchManager#triggerWatchMethod, find watchers based on path, delete those watchers from the cache, and executeWatcher#processMethod, corresponding to the above, will be executed hereNIOServerCnxn#processMethod, which is sent to the clientWatcherEventThe event

So what happens when a client receives a WatcherEvent from a server?

  1. On the client sideSendThread#readResponseMethod to resolve the return from the serverWatcherEventEvent, then will theWatcherEventPushed to theEventThread.waitingEventsBlocking queue
  2. EventThreadThreads are constantly being drawn fromEventThread.waitingEventsBlock the queue to get the message, and then execute according to the return of the serverWatcherEventFind the Watcher object previously registered with the client, remove the Watcher from the client, and executeEventThread#processEventmethods
  3. inEventThread#processEventMethod triggers the clientWatcher#processMethod, support, end of process

Single startup Process

With the foundation of the previous chapters, let’s take a quick look at the startup process in ZK standalone mode

  1. Loads configuration items from a configuration file
  2. Example Start the scheduled file clearing task
  3. Start the AdminServer
  4. Start theNIOServerCnxnFactoryStart the relevant thread, which is ready to receive client connections
  5. createZkDatabaseAnd loads data into memory from snapshot files and transaction log files
  6. Start theZooKeeperServerDuring this period, it will startSessionTrackerIs used for Session management
  7. Initialize theRequestProcessorChain of responsibility

Single machine transaction request

  1. The overall process is shown below, with many details to be worked out
  2. There are aboutZooKeeperServer.outstandingChangesIt is important to remember that messages are pushed to the blocking queue only when requests are written. As for its function, I feel it is zK based on where it is used at presentOpCode.multiInstruction related: When we have multiple commands involved in a single request, such asChange first -> check again -> change againAfter the first change, the transaction log is written, but it has not committed yet, so the old data is still seen in memory, passZooKeeperServer.outstandingChanges, you can first write the modified value to the blocking queue and its exclusive cache, and then query to see the currently requested data. Ability is limited and needs to be studied
  3. inFinalRequestProcessor#processRequestIs executed firstZKDatabase#processTxnMethod, the in-memory DataTree cache. And then updateZKDatabase.committedLogQueue, which is mainly used for data synchronization, based on which to determine the data synchronization method

Cluster Startup Process

  1. Load configuration items and pre-verification
  2. Load data: Loads data into memory from snapshot files and transaction log files
  3. Start theNIOServerCnxnFactoryStart the relevant thread, which is ready to receive client connections
  4. Start the AdminServer
  5. Initializes resources related to primary selection, including:QuorumCnxManager.Listener#start,FastLeaderElection#start. There are five threads involved in this, and several primary connection ports are opened between servers
  6. Primary selection: ExecuteQuorumPeer#run => FastLeaderElection#lookForLeaderWhen this is done, it will trigger a private connection between the servers, making the threads in Step 5 active
  7. Step 6 After the primary selection is complete, the primary selection is confirmedWould-be leaderZookeeperServer is created according to the status of each server. ZookeeperServer has different responsibilities according to the status of each server. Therefore, ZookeeperServer must be distinguished from ZookeeperServer in the following states:
    1. ifleader: createLeaderLeaderZooKeeperServer
    2. iffollower: createFollowerFollowerZooKeeperServer
    3. ifobserver: createObserverObserverZooKeeperServer
  8. Data synchronization involves a lot of details. To summarize:
    1. Leader to startTCP server, and calculate the current leader’sepochAnd then blocks waiting for another serverEpoch synchronization request
    2. Followers send messages to the leaderEpoch synchronization requestTo get the latestepoch. If the capture is successful, the follower needs to send a feedback to the leaderepoch ACK
    3. The leader receives more than half of theepoch ACK, fromepoch ACKGet the latest follower on thezxidAccording to thezxidAnd localcommitLogCalculate the data synchronization method and send the synchronization request to the other follower nodes
    4. Followers receive synchronized data and then perform data synchronization
    5. The leader sends a message to the followerNEWLEADERRequest and wait for more than half of the followers to returnNEWLEADER ACK
    6. Leader receives halfNEWLEADER ACKAfter the startLeaderZooKeeperServerAnd then send them asynchronously to the followersUPTODATEStarts receiving requests normally
    7. Followers receivedUPTODATEAnd then it startsFollowerZooKeeperServerStarts receiving requests normally
  9. At this point, the cluster is successfully started and services are normal

Choose the main interaction

  1. QuorumPeerAfter the thread is started, it passes firstFastLeaderElection#lookForLeaderThe thread will block until the primary selection is complete
  2. The initial state isLOOKING, first send out your vote: one for each SIDToSendMessages, and then push those messages toFastLeaderElection.sendqueueThe queue. If no connection is set up with another server, the TCP long connection is set up before sending the packet
  3. Step 2 talks about establishing a connection, but when do you start the election server? In fact inQuorumPeerThe server is started before the thread is startedQuorumCnxManager.ListenerThread logic
  4. FastLeaderElection.WorkerSenderThe thread will keep going fromFastLeaderElection.sendqueueThe queue takes the message and pushes it toQuorumCnxManager#sendQueueThe queue
  5. QuorumCnxManager.SendWorkerThe thread will keep going fromQuorumCnxManager#sendQueueQueues take messages and send them to other servers over TCP connections
  6. QuorumCnxManager.RecvWorkerThe thread will always read votes from other servers in socker, and then put the vote information intoQuorumCnxManager#recvQueueIn the queue
  7. FastLeaderElection.WorkerReceiverThe thread will keep going fromQuorumCnxManager#recvQueueThe queue takes the message and determines: 1. If it is not a valid server node (as determined by the SID), it sends the message directly toFastLeaderElection.sendqueueQueue, which causes the information to be returned directly to the server; If the current node status isLOOKING, the ballot is sent toFastLeaderElection.recvqueueIf the received vote Epoch is smaller than the current machine Epoch, the current machine’s vote is sent toFastLeaderElection.sendqueueThe queue; If the current node status is notLOOKING, sends the ballot of the current machine toFastLeaderElection.sendqueueThe queue
  8. inQuorumPeerIn a thread, the main selection logic will continueFastLeaderElection.recvqueueThe queue takes the message and PK it. Then repeat the above process until the primary selection is complete

Data synchronization

Cluster transaction request

File clean up

  1. Obtained from the configuration fileautopurge.purgeIntervalRepresents a file cleanup every few hours
  2. ifautopurge.purgeIntervalIf the value of is greater than 0, an every is createdautopurge.purgeIntervalA scheduled task is executed once every hour. The task logic is clearing logic
  3. Read from the configuration fileautopurge.snapRetainCountThe value indicates the number of recent snapshot files to be retained. The minimum value is 3nOn behalf of the value
  4. Sort snap files in descending order to find the most recentnA snap file
  5. fromnIn the snap file file, take the smallest file and find it according to the file nameleastZxidToBeRetainIs less thanleastZxidToBeRetainBoth the snapshot file and the transaction log file can be deleted
  6. There is one caveat to Step 5. Ideally,snap.leastZxidToBeRetainThe next transaction log file corresponding to the file islog.leastZxidToBeRetain+1But there’s a good chance the filelog.leastZxidToBeRetain+1Does not exist, at the same timelog.leastZxidToBeRetain-aThe file exists (a>0) and we need to rely on it to be safelog.leastZxidToBeRetain-aFile can be completed on filesnap.leastZxidToBeRetainFile recovery, which explainssnap.leastZxidToBeRetainWhen the file can be thick,log.leastZxidToBeRetain-aFiles may not be deleted, which points to the situation
  7. Based on step 6 analysis, if anylog.leastZxidToBeRetainAll subsequent transaction log files, including this transaction log file, are returned; Otherwise all subsequent transaction log files, including the previous one, are returned. knownlog.70 log.81 log.100If:leastZxidToBeRetainIs 80, returnslog.70 log.81 log.100; ifleastZxidToBeRetainIs 81, returnslog.81 log.100. In theory,leastZxidToBeRetainIf it is 80, it should return onlylog.81 log.100Because theTransaction ID80Can be deleted, which means that the next transaction ID starts at 81, but the code logic will returnlog.70 log.81 log.100.The transaction log files found represent undeletable, assuming the file list is Q
  8. Delete is not in the file listQThe transaction log file in
  9. delete<=leastZxidToBeRetainSnapshot file of