Zab: ZooKeeper Atomic Broadcast Protocol.

As the core protocol of ZK, it runs through zK from beginning to end.

We know that zK does several things from startup to death:

  • Elected leader
  • Found that the server
  • Synchronous transaction
  • broadcasting
  • Collapse re-election

Zab is the guarantor of these things, it makes everything go the way we want it to. Now let’s see how ZK does it.

Election

As mentioned earlier, ZK uses FLE for elections and is currently the only election algorithm supported by ZK. I won’t repeat it here.

Other stages are interpreted by the roles of Leader and Follower.

Leader

1. Discovery

QuorumPeer = run (); QuorumPeer = run (); Case will do different things for different states of the serverState.

We looked at LOOKING’s case because this is the entrance to Election.

Now after the election, serverState will switch to another state, and zabState will switch as well

Let’s look at the run method again (intercept) :

/ / source reference: org. Apache. Zookeeper. Server. Quorum. QuorumPeer# run
case LEADING:
    try {
        setLeader(makeLeader(logFactory));
        // There is a while in this method, if it returns,
        // Indicates that the leadership of the server ends and the next election is held
        leader.lead();
        setLeader(null);
    }catch (Exception e) {
        LOG.warn("Unexpected exception", e);
    } finally {
        if(leader ! =null) {
            leader.shutdown("Forcing shutdown");
            setLeader(null);
        }
        // This will reset serverState to LOOKING
        // To initiate the election again throughout the while
        updateServerState();
    }
    break;
}
Copy the code

As you can see from the above, after being selected as the leader, all mysteries are in the leader.lead() method. Now go and see what this guy has done behind our backs!!

I looked at this method, from 576 lines to 874 lines, a total of 874-576 = 298 lines, we usually write code like this, estimate will be killed, and then the leaders said, you this method can write so many lines!! Aha ha ha ha: Joy:

The whole stage requires comparison:

org.apache.zookeeper.server.quorum.LearnerHandler#run
Copy the code

The LearnerHandler, which assigns a connection to each follower, is responsible for communicating with the server throughout its lifetime.

Also note the synchronization of the Leader and LearnerHandler at several key nodes:

  • GetEpochToPropose: The handler retrieves more than half of the Server epoch and generates the leader epoch at a time

  • WaitForEpochAck: The handler retrieves more than half of the Server ACKS for synchronization

  • WaitForNewLeaderAck: Waiting for the result of server synchronization. The entire synchronization process is handled by the handler.

It is through these methods, although it is asynchronous, but finally at the key nodes, will wait for their respective completion of their own things!!

self.setZabState(QuorumPeer.ZabState.DISCOVERY);
Copy the code

As you can see, this method starts with zabState set to DISCOVERY, that is, zK officially goes from the election phase to the DISCOVERY phase:

self.setZabState(QuorumPeer.ZabState.DISCOVERY);
// Add the epoch and zxID to StatueSummary for comparison
leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
// Start thread to accept followers connection
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();
// This method is used to retrieve the epoch, and +1 is the epoch of the new leader. - [1]
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
// after zk fetched the largest epoch of any connection, +1 was set to zero 32 bits lower to the new ZXID
zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
// ...
// Wait for the epoch ACK of more than half of the servers to confirm that the new epoch has been sent to more than half of the servers
// This is handled the same way as above to get the maximum epoch
// ZK thinks the election is over after enough epoch ack
waitForEpochAck(self.getId(), leaderStateSummary);
self.setCurrentEpoch(epoch);
self.setLeaderAddressAndId(self.getQuorumAddress(), self.getId());
self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
Copy the code

At this point, the Discovery phase is over. Let’s summarize what the leader does in the Discovery stage:

  • Get the latest epoch
    • Quorum Packet (QP) of type FOLLOWERINFO or OBSERVERINFO received
    • Count and get the largest epoch, and set the new epoch
    • Regress the LEADERINFO QP with the new epoch to the current server
  • Get more than half of the server connections (waitForEpochAck) :
    • Handler waits for a response from the follower/ Observer ACKEPOCH type. More than half indicate that the new epoch has been set successfully

2. Synchronization

self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);	
Copy the code

Here, the ZAB protocol enters the synchronization phase. Let’s look at what the leader does in the synchronization phase:

/** Get more than half of the ack as in [1]. If we look at the call to this method, we can see that it is called in learnerHandler, and before the call, All learnerHandler does is synchronize with the server it connects to
waitForNewLeaderAck(self.getId(), zk.getZxid());
// This opens the leader's transaction responsibility chain -- [2]
startZkServer();
// This is the configuration described in how-to-use, set to yes(the default) to indicate that the leader will also accept the client connection
// Add zkServer to ServerCnxFactory
if(! System.getProperty("zookeeper.leaderServes"."yes").equals("no")) {
    self.setZooKeeperServer(zk);
}
// BROADCAST?
// Yes, because I have the assistant learnerHandler
self.setZabState(QuorumPeer.ZabState.BROADCAST);
Copy the code

Here are a few types of synchronization (you can also think about which cases lead to the following types of processing) :

  • DIFF: differential synchronization
  • TRUNC: rollback synchronization
  • SNAP: Full synchronization

At this point, let’s look at the LearnerHandler code from waitForEpochAck to the middle of waitForNewLeaderAck

syncFollower

org.apache.zookeeper.server.quorum.LearnerHandler#syncFollower

  1. Leader lastProcessedZxid == peerLastZixd: indicates that the transactions are consistent and no synchronization is required. In this case, the leader sends an empty DIFF QP to the follower

  2. Peer lastProcessedZxid Is between [minCommittedLog, maxCommittedLog]. This interval is a commit queue maintained by ZK for fast synchronization with servers. Zk will first send a DIFF QP to the server, and then send the transaction larger than the peer ZXID to the server in a Proposal. After the server receives the transaction, Respond to an ack

  3. When peer lastProcessedZxid > maxCommittedLog, trunc is required. Similarly, trunc qP is sent first and rollback synchronization is performed using the Proposal

  4. If the peer lastProcessedZxid < minCommittedLog, the ZK tries to synchronize data in on-disk txnlog + committedLog mode. If the synchronization fails, the ZK uses SNAP mode to synchronize data. SNAP reads a snapshot. XXX file from disk, serializes it, and sends it to the server

After the synchronization is complete, the leader sends a NEWLEADER QP to the server to inform the server that the synchronization is over. After receiving an ACK response from the server, the leader and learnerHandler synchronize at the same time. The method waitForNewLeaderAck), the leader will execute the startZKServer method, and the available learnerHandler will also wait for zkServer to start.

startZkServer

org.apache.zookeeper.server.quorum.Leader#startZkServer

/ / derived from the base class org. Apache. Zookeeper. Server ZooKeeperServer# startup
// Intercept some code
public synchronized void startup(a) {
    // Here is the responsibility chain of the leader transaction mentioned above
    setupRequestProcessors();
    // Set zK status
    setState(State.RUNNING);
    // Notify all threads waiting on the object
    // These threads are the learnerHandler that connects to each server
    notifyAll();
}
Copy the code

Once zkServer is started, The learnerHandler sends a UPTODATE message to each server telling them that we are ready to open the door and accept a client connection.

3. Broadcast

while (true) {
    synchronized (this) {
        // Check the status of quorumPeer and zkServer
        if (!this.isRunning()) {
            break;
        }
	    // have less than half of followers jump out of while
        if(! tickSkip && ! syncedAckSet.hasAllQuorums()) {break; }}// Send ping packets periodically
    for(LearnerHandler f : getLearners()) { f.ping(); }}// If you jump from while, the MSG here will have a value. I have simplified the code
if (null! = shutdownMessage) { shutdown(shutdownMessage); }Copy the code

This is what the leader.lead() method does, but we already know that the learnerHandler handles most of the work between the leader and the server. So at this point, we also have to see what the learnerHandler is doing, right

while (true) {
    qp = new QuorumPacket();
    ia.readRecord(qp, "packet");
    switch (qp.getType()) {
    case Leader.ACK:
        // ...
    case Leader.PING:
        // ...
        break;
    case Leader.REVALIDATE:
        // ...
        break;
    case Leader.REQUEST:
        // ...
        break;
    default:
        LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
        break; }}Copy the code

As you can see, the learnerHandler passes the switch… case… To handle each type of request from the server, throw an exception when something goes wrong, and finally, in the finally code block, shutdown

Follower

For followers, also switch from the QuorumPeer’s Run method… case… Starting with the code block, I will not paste the code here. I will simply sort out the general process and the communication with the leader at each stage.

1. Discovery

  • Followers (below) is replaced by a F, by org. Apache. Zookeeper. Server. The quorum. Learner# connectToLeader establish a TCP connection with the new leader.

  • F Sends qP of type FOLLOWERINFO to the leader(OBSERVERINFO if observer), which is received by learnerHandler.

    Mentioned above leader lead in the implementation method, first to create LearnerCnxAcceptor — — > LearnerCnxAcceptorHandler – > LearnerHandler, after this series code execution, Just waiting for F to connect and send the FOLLOWERINFO message. This is the first message they communicate with; if not, the connection is discarded by the leader

  • Waiting for the Leader’s LEADERINFO QP, the post contains the leader’s latest epoch, so F sets the new epoch and sends the QP of type ACKEPOCH to the leader

End of Discovery phase

2. Synchronization

The leader sends synchronization flags (DIFF/TRUNC/SNAP) through the learnerHandler for different processing and reply. In the org. Apache. Zookeeper. Server. Quorum. Learner# syncWithLeader, we can see a different approach.

After the synchronization, the leader will send a NEWLEADER QP to F, and the LEADER will send an ACK. After receiving more than half of the ACKS, the leader will send an UPTODATE QP, and F will break out of the while loop after receiving the QP of this type:

case Leader.UPTODATE:
    break outerLoop;
Copy the code

The following code replies with ACK and starts zkServer:

ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
writePacket(ack, true);
sock.setSoTimeout(self.tickTime * self.syncLimit);
self.setSyncMode(QuorumPeer.SyncMode.NONE);
zk.startup();
Copy the code

F zkServer also starts a chain of responsibility for transaction processing:

protected void setupRequestProcessors(a) {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
    commitProcessor.start();
    firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
    ((FollowerRequestProcessor) firstProcessor).start();
    syncProcessor = new SyncRequestProcessor(this.new SendAckRequestProcessor(getFollower()));
    syncProcessor.start();
}
Copy the code

As can be seen from the code, the responsibility chain of F is as follows:

  • FollowerRequestProcessor –> CommitProcessor –> FinalRequestProcessor

  • SyncRequestProcessor –> SendAckRequestProcessor

Through these chains of responsibility, the handling of a transaction becomes a clear division of labor.

3. Broadcast

After the syncWithLeader execution is complete, that is, synchronization is complete, F enters the broadcast phase.

If the F is configured with observerMasterPort, an ObserverMaster thread is also started to synchronize with observer transactions

if (self.getObserverMasterPort() > 0) {
    om = new ObserverMaster(self, fzk, self.getObserverMasterPort());
    om.start();
} 
while (this.isRunning()) {
    readPacket(qp);
    processPacket(qp);
}
Copy the code

Here again, contact with the leader is maintained through the while

A write transaction

As we already know, ServerCnxnFactory (we did not configure netty, here refers to NIOServerCnxnFactory) is responsible for the connection to the client throughout the life cycle. Take a look at the NIOServerCnxnFactory class diagram:

Inheriting from ServerCnxnFactory, the main internal components are:

  • IOWorkRequest: handles I/O reads and writes (m, configurable, default number of cores x 2)
  • ConnectionExpirerThread: Is responsible for clearing abnormal/invalid/expired connections with clients (1 thread)
  • AcceptThread: Accepts connections from clients and assigns them to selectorThreads (1 thread)
  • SelectorThread: Handles connections to clients (n, configurable, or square root of kernel /2)

When a client request comes in, the selectorThread is sent to doWork of IOWorkRequest(in fact, IOWorkRequest thread pool), and doWork is sent to doIO of NIOServerCnxn. This class is really responsible for communicating with the client. If it receives a message from a client, the payload of this class is processed by the readRequest(or readConnectRequest) method.

private void readRequest(a) throws IOException {
    zkServer.processPacket(this, incomingBuffer);
}
Copy the code

This method is very simple, I is to send the request to zkServer processing, will eventually add:

LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();
Copy the code

This property, however, is handled by the run method of the class, the RequestThrottler thread, and blocked by the take() method until a message arrives.

ZooKeeperServer will handle it:

// org.apache.zookeeper.server.ZooKeeperServer#submitRequestNow
public void submitRequestNow(Request si) {
    try {
        // As you can see here, the message is handed to the Follower responsibility chain
        firstProcessor.processRequest(si);
    } catch (MissingSessionException e) {
        requestFinished(si);
    } catch(RequestProcessorException e) { requestFinished(si); }}Copy the code

At this point, we can sort of figure out the flow from the client sending a message to the message being processed

annotation

[1]

cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
Copy the code

Let’s talk about these three lines.

The first two lines are used to start the thread and accept the followers connection. What does accepting connections do?

We know that the new emperor will have a New Year name and the new leader in ZK will update his epoch.

Suppose there is an isolated C follower in the cluster that has never obtained the leader’s epoch. Then the leader fails, and C happens to be elected as the new leader. It needs to generate its own epoch. What should it do? Just add one to your own epoch?

That won’t do! You can’t be sure if the epoch has been used by other leaders!

Through the above three lines, ZK also acquired the latest epoch in the ensemble, that is, the largest epoch in the current ensemble, and then added one as the new epoch.

This is a holistic approach, which is very common in ZK.

If I had written it, I would have waited for all the connections first, counted all epochs after more than half of them, and then compared them with my own epochs to obtain the largest epoch and add one to become the epoch of the new leader.

Well, a very ordinary idea.

Let’s see how these guys who write hundreds of lines in one method do it.

  1. Start the LearnerCnxAcceptor class, which is used to process connections to other servers, and use this handler to retrieve the latest epoch.

    long zxid = qp.getZxid();
    long newEpoch = learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
    long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);
    Copy the code
  2. In the Leader main thread, getEpochToPropose is also used to retrieve the latest epoch

  3. So the key here is getepochtoveto.

    // org.apache.zookeeper.server.quorum.Leader#getEpochToPropose
    public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
        // Set myIDS for all connected servers
        synchronized (connectingFollowers) {
            // Flag bit, starting with true
            if(! waitingForNewEpoch) {return epoch;
            }
            // As long as your epoch is bigger than mine, you can replace me
            if (lastAcceptedEpoch >= epoch) {
                epoch = lastAcceptedEpoch + 1;
            }
    	    // Verify that the server is a valid member of the voting group
            // The order here indicates that you may not be a member of the voting party, as long as your epoch is older than mine
            // Why is this so? I understand that according to the ZAB protocol, the highest epoch here must be generated from the legitimate voting group members
            // So the way to deal with this is that you are in the right
            if (isParticipant(sid)) {
                connectingFollowers.add(sid);
            }
            QuorumVerifier verifier = self.getQuorumVerifier();
            // Get the latest epoch and set flag to false
            // Notify all waiting threads
            if (connectingFollowers.contains(self.getId()) && verifier.containsQuorum(connectingFollowers)) {
                waitingForNewEpoch = false;
                self.setAcceptedEpoch(epoch);
                connectingFollowers.notifyAll();
            } else {
                long start = Time.currentElapsedTime();
                if (sid == self.getId()) {
                    timeStartWaitForEpoch = start;
                }
                long cur = start;
                long end = start + self.getInitLimit() * self.getTickTime();
                // While + wait
                // All threads that access this method will end up waiting here if they don't get half of the connections
                while(waitingForNewEpoch && cur < end && ! quitWaitForEpoch) { connectingFollowers.wait(end - cur); cur = Time.currentElapsedTime(); }if (waitingForNewEpoch) {
                    throw new InterruptedException("Timeout while waiting for epoch from quorum"); }}returnepoch; }}Copy the code

I’ve added a little comment to the code above, but let’s talk about it more carefully.

We know that in Java wait and notifyAll are two methods of Object. For me, who writes business code for a long time, I hardly ever use it. Today is also after seeing here, I know that can also play this way.

Java.lang.Object#wait(long, int)

This method causes the current thread (referred to here as T) to place itself in the wait set for this object and then to relinquish any and all synchronization claims on this object. Note that only the locks on this object are relinquished; any other objects on which the current thread may be synchronized remain locked while the thread waits.

This method puts the current thread on the wait list of the lock object. For the code above, all visiting threads are put on the wait list of connectingFollowers.

But this method has a synchronization lock, so if one thread comes in, how can other threads access it?

The DOC of Wait says that synchronization claims on the object are also abandoned and only any locks on the previously acquired object are abandoned, at which point other threads can compete again.

Until a thread notify or notifyAll or interrupted or times out.

So here, I understand.

All server connections, including themselves, wait here for more than half of the connections to reach the standard, wake up, and go about their own business.

[2]

If we click on this method step by step, we’ll see that it calls two methods:

setupRequestProcessors();
startRequestThrottler();
Copy the code

The first method is used to start the chain of responsibility, responsible for the processing of all things during the leader.

The second method acts as a valve to constrain the number of requests.

Let’s focus on the first method and see what it does:

protected void setupRequestProcessors(a) {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
    commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
    commitProcessor.start();
    ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
   // In the initialize method, another syncRequestProcessor is initialized
    proposalProcessor.initialize();
    prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
    prepRequestProcessor.start();
    firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);

    setupContainerManager();
}
Copy the code

Here you can clearly see the chain of the leader processing requests:

LeaderRequestProcessor –> PrepRequestProcessor –> ProposalRequestProcessor –> CommitProcessor –> ToBeAppliedRequestProcessor –> FinalRequestProcessor

The leader also has a chain of responsibilities for logging transactions to disk:

SyncRequestProcessor –> AckRequestProcessor

As for what each processer is responsible for, you can take a look at the code and see what you can learn. I won’t go into details here.


The original link