After the implementation analysis of zooKeeper election in the last article, we know that after the election of ZooKeeper cluster, the leader node will enter the LEADING state and the follower node will enter the FOLLOWING state. In this case, nodes in the cluster synchronize data to ensure data consistency. The ZooKeeper cluster can provide external services only after data synchronization is complete.

LEADING

The node will enter the LEADING state after its role is confirmed as leader after the election. The source code is as follows:

public void run(a) {
    try {
        /* * Main loop */
        while (running) {
            switch (getPeerState()) {
            case LEADING:
                LOG.info("LEADING");
                try {
                    setLeader(makeLeader(logFactory));
                    leader.lead();
                    setLeader(null);
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e);
                } finally {
                    if(leader ! =null) {
                        leader.shutdown("Forcing shutdown");
                        setLeader(null);
                    }
                    setPeerState(ServerState.LOOKING);
                }
                break; }}}finally{}}Copy the code

QuorumPeer creates the Leader instance and triggers the Lead process after the node status changes to LEADING.

void lead(a) throws IOException, InterruptedException {
    try {
		/ / to omit

        /** * Starts the thread to receive connection requests from followers */
        cnxAcceptor = new LearnerCnxAcceptor();
        cnxAcceptor.start();
        
        readyToStart = true;

        /** * blocks to compute the new epoch value and set zxID */
        long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());          
        zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
        
        
        /** * Blocks the ACKEPOCH message from the follower node that has received half of it; This indicates that the epoch value */ has been determined after the current election
        waitForEpochAck(self.getId(), leaderStateSummary);
        self.setCurrentEpoch(epoch);

        try {
            /** * the follower sends a NEWLEADER ACK message while waiting for more than half of the nodes to block. More than half of the followers have completed data synchronization */
            waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);
        } catch (InterruptedException e) {
            / / to omit
        }

        /** * Start the ZK server and the cluster can officially provide services */
        startZkServer();

        / / to omit
}
Copy the code

It can be seen from the implementation of the Lead method that the leader and follower perform the following process during data synchronization:

  • Receiving follower connection
  • Compute the new EPOCH value
  • Notify the unified EPOCH value
  • Data synchronization
  • Start the ZK Server to provide external services

FOLLOWING

The FOLLOWING operations are performed after the follower node enters the FOLLOWING state:

public void run(a) {
    try {
        /* * Main loop */
        while (running) {
            switch (getPeerState()) {
            case LOOKING:
                / / to omit
            case OBSERVING:
                / / to omit
            case FOLLOWING:
                try {
                    LOG.info("FOLLOWING");
                    setFollower(makeFollower(logFactory));
                    follower.followLeader();
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e);
                } finally {
                    follower.shutdown();
                    setFollower(null);
                    setPeerState(ServerState.LOOKING);
                }
                break; }}}finally{}}Copy the code

The QuorumPeer creates a follower instance and triggers the followLeader process after the node status changes to FOLLOWING.

void followLeader(a) throws InterruptedException {
    / / to omit
    try {
        QuorumServer leaderServer = findLeader();            
        try {
            /** * the follower establishes a connection with the leader */
            connectToLeader(leaderServer.addr, leaderServer.hostname);

            /** * the follower submits node information to the leader for computing the new epoch value */
            long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);

            
            /** * Follower synchronizes data with the leader */
            syncWithLeader(newEpochZxid);                
            
             / / to omit

        } catch (Exception e) {
             / / to omit}}finally {
        / / to omit}}Copy the code

It can be seen from the implementation of the followLeader method that the follower and leader perform the following processes during data synchronization:

  • Request connection leader
  • Submit node information to compute the new EPOCH value
  • Data synchronization

Let’s take a look at the implementation details in each link.

Leader Follower establishes communication

Follower request connection
protected QuorumServer findLeader(a) {
    QuorumServer leaderServer = null;
    // Find the leader by id
    Vote current = self.getCurrentVote();
    for (QuorumServer s : self.getView().values()) {
        if (s.id == current.getId()) {
            // Ensure we have the leader's correct IP address before
            // attempting to connect.
            s.recreateSocketAddresses();
            leaderServer = s;
            break; }}if (leaderServer == null) {
        LOG.warn("Couldn't find the leader with id = "
                + current.getId());
    }
    return leaderServer;
}           
Copy the code
protected void connectToLeader(InetSocketAddress addr, String hostname)
            throws IOException, ConnectException, InterruptedException {
    sock = new Socket();        
    sock.setSoTimeout(self.tickTime * self.initLimit);
    for (int tries = 0; tries < 5; tries++) {
        try {
            sock.connect(addr, self.tickTime * self.syncLimit);
            sock.setTcpNoDelay(nodelay);
            break;
        } catch (IOException e) {
            if (tries == 4) {
                LOG.error("Unexpected exception",e);
                throw e;
            } else {
                LOG.warn("Unexpected exception, tries="+tries+
                        ", connecting to " + addr,e);
                sock = new Socket();
                sock.setSoTimeout(self.tickTime * self.initLimit);
            }
        }
        Thread.sleep(1000);
    }

    self.authLearner.authenticate(sock, hostname);

    leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(
            sock.getInputStream()));
    bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
    leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
}
Copy the code

Followers will confirm the leader node address based on the voting information after the election and initiate a connection (there are 5 attempts to connect in total. If the connection fails, the election process will be restarted).

Leader receiving connection
class LearnerCnxAcceptor extends ZooKeeperThread{
    private volatile boolean stop = false;

    public LearnerCnxAcceptor(a) {
        super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress());
    }

    @Override
    public void run(a) {
        try {
            while(! stop) {try{
                	/** * receives the follower connection and starts the LearnerHandler thread to handle the communication between the two */
                    Socket s = ss.accept();
                    s.setSoTimeout(self.tickTime * self.initLimit);
                    s.setTcpNoDelay(nodelay);

                    BufferedInputStream is = new BufferedInputStream(
                            s.getInputStream());
                    LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
                    fh.start();
                } catch (SocketException e) {
                    / / to omit
                } catch (SaslException e){
                    LOG.error("Exception while connecting to quorum learner", e); }}}catch (Exception e) {
            LOG.warn("Exception while accepting follower", e); }}}Copy the code

As you can see from the LearnerCnxAcceptor implementation, the Leader node allocates a LearnerHandler thread to handle communication between each follower node after the connection is established.

Compute the new EPOCH value

Followers send FOLLOWERINFO after establishing a connection with the leader


long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);

Copy the code
protected long registerWithLeader(int pktType) throws IOException{
    /** * Send follower info, including last zxID and sid */
	long lastLoggedZxid = self.getLastLoggedZxid();
    QuorumPacket qp = new QuorumPacket();                
    qp.setType(pktType);
    qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
    
    /* * Add sid to payload */
    LearnerInfo li = new LearnerInfo(self.getId(), 0x10000);
    ByteArrayOutputStream bsid = new ByteArrayOutputStream();
    BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
    boa.writeRecord(li, "LearnerInfo");
    qp.setData(bsid.toByteArray());
    
    /** * followers send FOLLOWERINFO to the leader, including zxID, SID, protocol version */
    writePacket(qp, true);
    
    / / to omit
} 
Copy the code

Let’s look at what the leader does after receiving the FOLLOWERINFO message (see LearnerHandler)

public void run(a) {
	try {
	    / / to omit
	    /** * The leader receives the FOLLOWERINFO information sent by the followers, including the follower node's ZXID, SID, and protocol version *@see Learner.registerWithleader()
	     */
	    QuorumPacket qp = new QuorumPacket();
	    ia.readRecord(qp, "packet");

	    byte learnerInfoData[] = qp.getData();
	    if(learnerInfoData ! =null) {
	    	if (learnerInfoData.length == 8) {
	    		/ / to omit
	    	} else {
	            /** * Higher versions of learnerInfoData include siDs of type long, and protocol versions of type int take 12 bytes */
	    		LearnerInfo li = new LearnerInfo();
	    		ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li);
	    		this.sid = li.getServerid();
	    		this.version = li.getProtocolVersion(); }}/** * Parse the foloower epoch */ from the zxID sent by the followers
	    long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
	    
	    long peerLastZxid;
	    StateSummary ss = null;
	    long zxid = qp.getZxid();

	    /** * blocks waiting to compute the new epoch value */
	    long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
	  
	    / / to omit
	}
Copy the code

From the above code, the leader will parse the acceptedEpoch value of the follower node and participate in the calculation of the new epoch value after receiving the FOLLOWERINFO message sent by the follower. (Specific calculation logic reference method Getepochtoveto)

public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
    synchronized(connectingFollowers) {
        if(! waitingForNewEpoch) {return epoch;
        }
        // epoch is used to record the calculated value of the election cycle
        // The acceptedEpoch value of the follower or leader is compared with the epoch; If the former is large, add one
        if (lastAcceptedEpoch >= epoch) {
            epoch = lastAcceptedEpoch+1;
        }
        // connectingFollowers Is used to record followers that are connected to the leader
        connectingFollowers.add(sid);
        QuorumVerifier verifier = self.getQuorumVerifier();
        // If the new EPOCH value has been calculated, the condition is that the leader has participated in the calculation of the EPOCH value and more than half of the nodes have participated in the calculation
        if (connectingFollowers.contains(self.getId()) && 
                                        verifier.containsQuorum(connectingFollowers)) {
            // Setting waitingForNewEpoch to false means there is no need to wait for the new epoch value to be calculated
            waitingForNewEpoch = false;
            // Set acceptedEpoch for the leader
            self.setAcceptedEpoch(epoch);
            // Wake up connectingFollowers Wait thread
            connectingFollowers.notifyAll();
        } else {
            long start = Time.currentElapsedTime();
            long cur = start;
            long end = start + self.getInitLimit()*self.getTickTime();
            while(waitingForNewEpoch && cur < end) {
                // Block and wait if the new EPOCH value is not evaluated
                connectingFollowers.wait(end - cur);
                cur = Time.currentElapsedTime();
            }
            if (waitingForNewEpoch) {
                throw new InterruptedException("Timeout while waiting for epoch from quorum"); }}returnepoch; }}Copy the code

According to the method getepochtochoose, the leader will collect more than half of the follower acceptedEpoch information in the cluster, select a maximum value and add 1 to obtain the newEpoch value. During this process, the leader will enter a blocking state until more than half of the followers participate in the calculation and then enter the next stage.

Notify the new EPOCH value

After the leader calculates the new newEpoch value, it moves to the next stage and sends the LEADERINFO message (also refer to LearnerHandler)

public void run(a) {
	try {
	    / / to omit

	    /** * blocks waiting to compute the new epoch value */
	    long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
            
        if (this.getVersion() < 0x10000) {
            // we are going to have to extrapolate the epoch information
            long epoch = ZxidUtils.getEpochFromZxid(zxid);
            ss = new StateSummary(epoch, zxid);
            // fake the message
            leader.waitForEpochAck(this.getSid(), ss);
        } else {
            byte ver[] = new byte[4];
            ByteBuffer.wrap(ver).putInt(0x10000);
            /** * After the new epoch value is calculated, the leader sends the LEADERINFO message to the followers. Includes the new newEpoch */
            QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
            oa.writeRecord(newEpochPacket, "packet");
            bufferedOutput.flush();

           	/ / to omit}}/ / to omit
}
Copy the code
protected long registerWithLeader(int pktType) throws IOException{
	/ / to omit

    /** * followers send FOLLOWERINFO to the leader, including zxID, SID, protocol version */
    writePacket(qp, true);

    /** * follower receives the LEADERINFO message sent by the leader */
    readPacket(qp);

    /** * Parse the new epoch value */ sent by the leader        
    final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
	if (qp.getType() == Leader.LEADERINFO) {
    	// we are connected to a 1.0 server so accept the new epoch and read the next packet
    	leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
    	byte epochBytes[] = new byte[4];
    	final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);

    	/** * new epoch > current accepted epoch
    	if (newEpoch > self.getAcceptedEpoch()) {
    		wrappedEpochBytes.putInt((int)self.getCurrentEpoch());
    		self.setAcceptedEpoch(newEpoch);
    	} else if (newEpoch == self.getAcceptedEpoch()) {   		
            wrappedEpochBytes.putInt(-1);
    	} else {
    		throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch());
    	}

    	/** * follower sends ACKEPOCH information to the leader, including last zxID */
    	QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
    	writePacket(ackNewEpoch, true);
        return ZxidUtils.makeZxid(newEpoch, 0); }}Copy the code

From the above code, we can see the interaction process between the leader and followers after the calculation of newEpoch value:

  • The leader sends the LEADERINFO message to the followers informing them of the new epoch value
  • The follower receives the LEADERINFO information. If the new EPOCH value is greater than the current Accepted EPOCH value, the acceptedEpoch is updated
  • The follower sends an ACKEPOCH message to the leader, informing him that he has received the new epoch value along with the last ZXID of the follower node

Data synchronization

The Leader in LearnerHandler will enter the data synchronization phase after receiving more than half of the ACKEPOCH messages

public void run(a) {
        try {
            / / to omit
            // peerLastZxid is the last zxID of the follower
            peerLastZxid = ss.getLastZxid();
            
            /* the default to send to the follower */
            int packetToSend = Leader.SNAP;
            long zxidToSend = 0;
            long leaderLastZxid = 0;
            /** the packets that the follower needs to get updates from **/
            long updates = peerLastZxid;
           
            ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();
            ReadLock rl = lock.readLock();
            try {
                rl.lock();        
                final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
                final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();

                LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();

                if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
                    /** * If the zxids of the followers and the leader are the same, the two data are consistent. The synchronization mode is differential synchronization DIFF, and the zxID of the synchronization is peerLastZxid, that is, no synchronization */ is required
                    packetToSend = Leader.DIFF;
                    zxidToSend = peerLastZxid;
                } else if(proposals.size() ! =0) {
                    // peerLastZxid is between minCommittedLog and maxCommittedLog
                    if ((maxCommittedLog >= peerLastZxid)
                            && (minCommittedLog <= peerLastZxid)) {
                        /** * the proposals that are used to take notes of the last proposal */
                        long prevProposalZxid = minCommittedLog;

                        boolean firstPacket=true;
                        packetToSend = Leader.DIFF;
                        zxidToSend = maxCommittedLog;

                        for (Proposal propose: proposals) {
                            // Skip the follower proposal that already exists
                            if (propose.packet.getZxid() <= peerLastZxid) {
                                prevProposalZxid = propose.packet.getZxid();
                                continue;
                            } else {
                                if (firstPacket) {
                                    firstPacket = false;
                                    if (prevProposalZxid < peerLastZxid) {
                                        /** * If some proposals do not exist on the leader node, the followers should be told to discard these proposals. That is, they should be told to rollback TRUNC first. The proposals need to be rolled back to prevProposalZxid, that is, the followers discard the data in the range prevProposalZxid to peerLastZxid * and the remaining proposals are synchronized through DIFF */packetToSend = Leader.TRUNC; zxidToSend = prevProposalZxid; updates = zxidToSend; }}/** * Queues the remaining proposals to be DIFF synchronized, waiting to be sent */
                                queuePacket(propose.packet);
                                /** * Each proposal corresponds to a COMMIT message */
                                QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
                                        null.null); queuePacket(qcommit); }}}else if (peerLastZxid > maxCommittedLog) {                    
                        /** * If the followers' ZXID is larger than that of the leader, the followers are told to TRUNC back */
                        packetToSend = Leader.TRUNC;
                        zxidToSend = maxCommittedLog;
                        updates = zxidToSend;
                    } else{}}}finally {
                rl.unlock();
            }

             QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                    ZxidUtils.makeZxid(newEpoch, 0), null.null);
             if (getVersion() < 0x10000) {
                oa.writeRecord(newLeaderQP, "packet");
            } else {
                 // The NEWLEADER message is sent after the data synchronization is complete
                queuedPackets.add(newLeaderQP);
            }
            bufferedOutput.flush();
            //Need to set the zxidToSend to the latest zxid
            if (packetToSend == Leader.SNAP) {
                zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
            }
            /** * Send a data synchronization mode message to tell followers how to synchronize data */
            oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null.null), "packet");
            bufferedOutput.flush();
            
            /* if we are not truncating or sending a diff just send a snapshot */
            if (packetToSend == Leader.SNAP) {
                /** * Serializes the leader local data into the follower output stream */ if full synchronization is performed
                leader.zk.getZKDatabase().serializeSnapshot(oa);
                oa.writeString("BenWasHere"."signature");
            }
            bufferedOutput.flush();
            
            /** ** start a thread to send packet */
            sendPackets();
            
            /** * Receives the follower ACK response */
            qp = new QuorumPacket();
            ia.readRecord(qp, "packet");

            /** * block waiting for half of the follower ack */
            leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType());

            /** * The leader sends a UPTODATE to followers to inform them that they can provide services externally */
            queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1.null.null));

            / / to omit}}Copy the code

It can be seen from the above code that the leader and follower will compare peerLastZxid with maxCommittedLog and minCommittedLog to determine the data synchronization method.

DIFF(Differential synchronization)

  • The peerLastZxid of the follower equals the peerLastZxid of the leader

In this case, the follower data is the same as the leader data, and the synchronization is performed in DIFF mode, that is, no synchronization is required

  • The peerLastZxid of the follower is between maxCommittedLog and minCommittedLog

In this case, there is a difference between the follower data and the leader data, which needs to be synchronized. First, the leader sends a DIFF message to the followers informing them of the synchronization mode. Then, the leader sends different proposals and proposal submission messages

The interaction process is as follows:

    Leader                 Follower

      |          DIFF         |  
      | --------------------> |
      |        PROPOSAL       |  
      | --------------------> |  
      |         COMMIT        |  
      | --------------------> |
      |        PROPOSAL       |  
      | --------------------> |  
      |         COMMIT        |  
      | --------------------> |
         
Copy the code

Example: Suppose the zxids corresponding to the proposal cache queue of the leader node are:

 0x500000001, 0x500000002, 0x500000003, 0x500000004, 0x500000005
Copy the code

If the peerLastZxid of the follower node is 0x500000003, the proposals 0x500000004 and 0x500000005 need to be synchronized. Then the packet sending process is as follows:

Message type ZXID
DIFF 0x500000005
PROPOSAL 0x500000004
COMMIT 0x500000004
PROPOSAL 0x500000005
COMMIT 0x500000005

TRUNC+DIFF(rollback before differential synchronization)

In the above DIFF differential synchronization, there is a special scenario that although the peerLastZxid of follower is between maxCommittedLog and minCommittedLog, However, the peerLastZxid of the follower does not exist in the leader node. In this case, the leader needs to tell the followers to roll back to the zxID before peerLastZxid, and then perform differential synchronization.

The interaction process is as follows:

    Leader                 Follower

      |         TRUNC         |  
      | --------------------> |
      |        PROPOSAL       |  
      | --------------------> |  
      |         COMMIT        |  
      | --------------------> |
      |        PROPOSAL       |  
      | --------------------> |  
      |         COMMIT        |  
      | --------------------> |
         
Copy the code

For example, assume that node A, NODE B, and node C in A cluster has A Leader election period of 5. The zxids include: (0x500000004, 0x500000005, and 0x500000006). Assume that the server of leader A breaks down at A certain moment when the request of 0x500000007 is broadcast after processing the transaction, causing 0x500000007 the transaction is not synchronized out. After the next election of the cluster, node B becomes the new leader and the election period is 6. It provides services externally and processes new transaction requests, including 0x600000001 and 0x600000002.

Cluster nodes ZXID list
A 0x500000004, 0x500000005, 0x500000006, 0x500000007
B 0x500000004, 0x500000005, 0x500000006, 0x600000001, 0x600000002
C 0x500000004, 0x500000005, 0x500000006, 0x600000001, 0x600000002

At this point, after node A reboots and joins the cluster, when synchronizing data with leader B, it finds that the transaction 0x500000007 does not exist on the Leader node. At this point, the leader tells node A to roll back the transaction to 0x500000006 first. At the difference synchronization transaction 0x600000001, 0x600000002; Then the packet sending process is as follows:

Message type ZXID
TRUNC 0x500000006
PROPOSAL 0x600000001
COMMIT 0x600000001
PROPOSAL 0x600000002
COMMIT 0x600000002

TRUNC(Rollback synchronization)

If the peerLastZxid of the follower is greater than the maxCommittedLog of the leader, the follower is told to roll back to the maxCommittedLog. This scenario can be considered a simplified mode for TRUNC+DIFF

The interaction process is as follows:

    Leader                 Follower

      |         TRUNC         |  
      | --------------------> |
         
Copy the code

SNAP(Full synchronization)

If the peerLastZxid of the follower is smaller than the minCommittedLog of the leader or there is no proposal cache queue on the leader node, full SNAP synchronization is adopted. In this mode, the leader first sends SNAP messages to the followers, and then serializes the full data from the in-memory database and transmits it to the followers. After receiving the full data, the followers deserialize and load it into the in-memory database.

The interaction process is as follows:

    Leader                 Follower

      |         SNAP          |  
      | --------------------> |
      |         DATA          |  
      | --------------------> |
         
Copy the code

After completing data synchronization, the leader sends a NEWLEADER message to the followers. After receiving an ACK from more than half of the followers, it indicates that more than half of the nodes have completed data synchronization. The leader then sends a UPTODATE message to the followers to inform them that the follower node is ready to provide services externally. At this point, the leader starts the ZK Server to provide services externally.

FOLLOWER Data Synchronization

How does FOLLOWER process in the data synchronization phase, see Learner. SyncWithLeader

protected void syncWithLeader(long newLeaderZxid) throws IOException, InterruptedException{
        QuorumPacket ack = new QuorumPacket(Leader.ACK, 0.null.null);
        QuorumPacket qp = new QuorumPacket();
        long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);

        /** * Receives the data synchronization packet sent by the leader */
        readPacket(qp);
        
        synchronized (zk) {
            if (qp.getType() == Leader.DIFF) {
                
            }
            else if (qp.getType() == Leader.SNAP) {
                // Perform full data loading
            } else if (qp.getType() == Leader.TRUNC) {
                // Perform a rollback
            }
            else {
            
            }
            
            outerLoop:
            while (self.isRunning()) {
                readPacket(qp);
                switch(qp.getType()) {
                case Leader.PROPOSAL:
                    // Process the proposal
                    break;
                case Leader.COMMIT:
                    // commit proposal
                    break;
                case Leader.INFORM:
                    / / ignore
                    break;
                case Leader.UPTODATE:
                    // Set the ZK server
                    self.cnxnFactory.setZooKeeperServer(zk);
                    // Exit the loop
                    break outerLoop;
                case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery 
                    /** * follower responds to NEWLEADER ACK */
                    writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null.null), true);
                    break;
                }
            }
        }
        ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
        writePacket(ack, true);
        // Start zK server
        zk.startup();
        
    }
Copy the code

From the above code, it can be seen that the processing flow of follower in the data synchronization phase is as follows:

  • The follower receives and processes data synchronization packets (DIFF, TRUNC, or SANP) sent by the leader

  • After receiving the NEWLEADER message from the leader, the follower responds to the leader with an ACK (the leader sends a UPTODATE after receiving more than half of the ACK messages).

  • After the follower receives a UPTODATE message from the leader, the follower can provide external services and start the ZK Server

summary

Finally, a figure is used to summarize zK’s post-election data synchronization process, as shown below: