preface

Debug ZooKeeper group leader election details.

Related reading

If you want to follow this article to Debug the source code of ZooKeeper, I recommend reading my setup tutorial. You can start the Debug environment in about half an hour.

  1. ZooKeeper source code environment

Preview:

  1. Briefly introduces the concept of ZooKeeper on group head election.
  2. Starting from main method, the basic communication module of group head election, QuorumCnxMananger, and its related details are analyzed step by step.
  3. The code implementation of FastLeaderElection is discussed based on the point-to-point communication foundation provided by QuorumCnxMananger.

define

ZooKeeper functions as a coordination middleware for distributed systems. In essence, it is a storage middleware. It achieves some additional subscription functions through customized communication between clients and servers.

Since it is a storage middleware and deals with problems in a distributed environment, it needs CAP theory to make selection first. ZooKeeper belongs to CP theory. This design conclusion can be obtained by reduction. When ZooKeeper has A network exception or A primary node exception, the cluster does not accept any read/write requests until the cluster recovers, which means that ZooKeeper has abandoned availability A in CAP theory.

In fact, when a ZooKeeper cluster processes client reads and writes, the write request must eventually fall on the primary node. This also means that no matter how large the ZooKeeper cluster is, the primary node will have a write bottleneck. This is why ZooKeeper is not suitable for large-scale storage. The advantage of this design is that consistency implementation is relatively simple.

The primary node of ZooKeeper is named Leader. Its high availability lies in the fact that when the Leader breaks down, it automatically selects the head of the cluster and restores the availability of the cluster. In terms of high availability, ZooKeeper is very reliable. Here are some technical principles for the group head election.

The principle of

The node role

In a ZooKeeper cluster, there are three roles in a node set

  1. Leader is responsible forWrite requestsInitiated,The proposal;
  2. Followers respond to queries and forwardWrite requests, to participate inThe electionandWrite to vote
  3. The ObServer responds to the query and forwardsWrite requests
  4. Talk about what to expect from ZooKeeper

The group Leader election only involves the Leader and Follower nodes. This article will not talk about the ObServer.

Transaction and its ZXID

The ZooKeeper cluster forwards read requests to other nodes to improve the read processing capability of the cluster. The write request will be sent to the Leader, and the write request will involve proposals and other operations. We regard this completed cluster write operation initiated by the Leader as a transaction.

Each transaction will have its own identifier, usually abbreviated as ZXID. This ZXID is a 64-bit Long integer, the first 32bit stores the epoch, the duration, and the last 32bit stores the counter value. Represents the calculated value.

To summarize, the Leader node publishes all write requests with an ZXID, which has two advantages.

  1. Ensure that data updates to nodes are sequential and sequential, thus avoiding many concurrent write problems (see serialization, MySQL’s highest isolation level).
  2. Indirectly realizes the idempotency of data synchronization, which makes the synchronization logic easier, and makes it easier to achieve retry, ensuring data consistency of the cluster.
ZxidUtils. Java source code/ * * * * ZXID: 64 - bit * | -- - | 32 bit term - 32 bit transaction counter - | * /

    /** * Get term number from ZxId *@param zxid
     * @return* /
    public static long getEpochFromZxid(long zxid) {
        return zxid >> 32L;
    }

    /** * get count * from ZXID@param zxid
     * @return* /
    public static long getCounterFromZxid(long zxid) {
        return zxid & 0xffffffffL;
    }

    /** * Type epoch and counter to get an ZXID * of type long@param epoch
     * @param counter
     * @return* /
    public static long makeZxid(long epoch, long counter) {
        return (epoch << 32L) | (counter & 0xffffffffL);
    }

Copy the code

In simple terms, each time the Leader initiates a write operation, after the operation is completed, each node will finally synchronize the transaction and update its ZXID, which will participate in the group head election.

The basis for the election of the heads of the group

  • Epoch(tenure): This value is the same as above and is published by the Leader. Where was the Leader Epoch value obtained? It was self-confirmed the moment he took over.
  • ZXID: as above, not repeat;
  • SID: in zoo. CFG file + myid file
/ / class
public class Vote {
    private final int version;
    private final long id; // The sid of the selected ZK node
    private final long zxid; // The selected ZXID of the ZK node
    private final long electionEpoch; // In an election cycle, an election will require a repeat vote, with new votes +1
    private final long peerEpoch; // The number of terms elected Leader
    private final ServerState state; // The current server status, LOOKING by default
}
Copy the code

Source code analysis

QuorumPeer.java

At the cluster election level, each ZK node has a QuorumPeer object, which is an abstraction of the ZK node at the cluster election level. Simple understanding: is the ZK set up a special election for lawmakers. This design is consistent with the principle of single responsibility.

// QuorumPeer.java
public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {
    
    // Responsible for network communication in the Leader election process of each server layer under FastLeaderElection algorithm
    private AtomicReference<QuorumCnxManager> qcmRef = new AtomicReference<>();

    // Data abstraction for storing ZK data
    private ZKDatabase zkDb;
}

Copy the code

QuorumPeer inherits Thread object, which is easier to understand because the ZK cluster in the perspective of election is always from LOOKING -> LEADING/FOLLOW -> LOOKING state transition. A Thread is required to continuously observe and respond to changes.

Instantiation and initialization of the QuorumPeer

The QuorumPeer object is instantiated after the Quorumpeermain.main () method is started.

// quorumpeermain. Java v3.7.0 line:175
            quorumPeer = getQuorumPeer();
            quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
            quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
            quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());
            //quorumPeer.setQuorumPeers(config.getAllMembers());
            quorumPeer.setElectionType(config.getElectionAlg());
            quorumPeer.setMyid(config.getServerId());
            quorumPeer.setTickTime(config.getTickTime());
            quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
            quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
            quorumPeer.setInitLimit(config.getInitLimit());
            quorumPeer.setSyncLimit(config.getSyncLimit());
            quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());
            quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
            quorumPeer.setConfigFileName(config.getConfigFilename());
            quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
            quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
            quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
            if(config.getLastSeenQuorumVerifier() ! =null) {
                quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
            }
            quorumPeer.initConfigInZKDatabase();
            quorumPeer.setCnxnFactory(cnxnFactory);
            quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
            quorumPeer.setSslQuorum(config.isSslQuorum());
            quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
            quorumPeer.setLearnerType(config.getPeerType());
            quorumPeer.setSyncEnabled(config.getSyncEnabled());
            quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
            if (config.sslQuorumReloadCertFiles) {
                quorumPeer.getX509Util().enableCertFileReloading();
            }
            quorumPeer.setMultiAddressEnabled(config.isMultiAddressEnabled());
            quorumPeer.setMultiAddressReachabilityCheckEnabled(config.isMultiAddressReachabilityCheckEnabled());
            quorumPeer.setMultiAddressReachabilityCheckTimeoutMs(config.getMultiAddressReachabilityCheckTimeoutMs());

            // sets quorum sasl authentication configurations
            quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
            if (quorumPeer.isQuorumSaslAuthEnabled()) {
                quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
                quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
                quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
                quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
                quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
            }
            quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
            quorumPeer.initialize();

            if (config.jvmPauseMonitorToRun) {
                quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));
            }

            quorumPeer.start();     // Start thread from here
            ZKAuditProvider.addZKStartStopAuditLog();
            quorumPeer.join();
Copy the code

The above code is the process of building the QuorumPeer object, basically setting the contents of the Config file into the object and starting the thread.

    @Override
    public synchronized void start(a) {
        if(! getView().containsKey(myid)) {throw new RuntimeException("My id " + myid + " not in the peer list");
        }
        // Load zkDataBase to load data from persistent disks. We'll talk more about the data model when we have a chance
        loadDataBase();
        startServerCnxnFactory();
        try {
            adminServer.start();
        } catch (AdminServerException e) {
            LOG.warn("Problem starting AdminServer", e);
            System.out.println(e);
        }
        // Head election begins
        startLeaderElection();
        startJvmPauseMonitor();
        super.start();
    }
    
    // QuorumPeer. Java group head election entry, except at startup; Also called when a Leader disconnection occurs.
    public synchronized void startLeaderElection(a) {
        try {
            if (getPeerState() == ServerState.LOOKING) {
                // If you are LOOKING, construct your vote first. Select yourself as Leader
                currentVote = newVote(myid, getLastLoggedZxid(), getCurrentEpoch()); }}catch (IOException e) {
            RuntimeException re = new RuntimeException(e.getMessage());
            re.setStackTrace(e.getStackTrace());
            throw re;
        }

        // Generate the election algorithm. Currently only type = 3 is supported. Cut in here
        this.electionAlg = createElectionAlgorithm(electionType);
    }
    
    // QuorumPeer.java
    protected Election createElectionAlgorithm(int electionAlgorithm) {... Omit other code// Build the cluster communication connector
        QuorumCnxManager qcm = createCnxnManager();
        QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
        if(oldQcm ! =null) {
            LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?) ");
            oldQcm.halt();
        }
        // Fixme is not aware of this yet
        QuorumCnxManager.Listener listener = qcm.listener;
        if(listener ! =null) {
            listener.start();
            // FastLeaderElection is currently the default election implementation for ZK, which will be mentioned again later.
            FastLeaderElection fle = new FastLeaderElection(this, qcm);
            fle.start();
            le = fle;
        } else {
            LOG.error("Null listener when initializing cnx manager"); }... Omit other codereturn le;
    }
Copy the code

Summary of the above code snippet:

  1. fromQuorumPeerMainmainMethod, we can debug to, and it will buildQuorumPeerObject;
  2. QuorumPeerThe object is the electoral unit in the cluster, inherited fromThreadClass;
  3. inQuorumPeer.start()Vote, QuorumCnxMananger, FastLeaderElection will be executed according to the following flowchart.
  4. At this point,mainThe thread’s work is done.

Implementation of communication between quorumCnxManager. Java nodes

Suppose we have three ZK Server nodes in our configuration file, and when the cluster does not reach the legal number of nodes, all the nodes will try to contact each other to reach the legal number of nodes. From the following error, you can see that the QuorumCnxManager is doing this

Server. 1=127.0.0.1:2222:2223 (assuming this is the local node) server.2= 127.0.0.1:33333:3334 server.3= 127.0.0.1:44444:4445 // Nodes will do their best to connect to other nodes in the cluster. // QuorumCnxManager 2020-05-10 12:35:12201 [myID :1] -warn [QuorumConnectionThread-[myID =1]-2:QuorumCnxManager@400] 2 - always open a channel to an election address / 127.0.0.1:3334 java.net.ConnectException: Connection refused (Connection refused) ... at org.apache.zookeeper.server.quorum.QuorumCnxManager.initiateConnection(QuorumCnxManager.java:383) at org.apache.zookeeper.server.quorum.QuorumCnxManager$QuorumConnectionReqThread.run(QuorumCnxManager.java:457) ... 2020-05-10 12:35:12,201 [myID :1] -WARN [QuorumConnectionThread-[myID =1]-3:QuorumCnxManager@400] - Cannot Open Channel To 3 at election address / 127.0.0.1:4445 java.net.ConnectException: Connection refused (Connection refused)... at org.apache.zookeeper.server.quorum.QuorumCnxManager.initiateConnection(QuorumCnxManager.java:383) at org.apache.zookeeper.server.quorum.QuorumCnxManager$QuorumConnectionReqThread.run(QuorumCnxManager.java:457) ...Copy the code

QuorumCnxManager is mainly used in FastLeaderElection algorithm and is responsible for TCP connection and message sending and receiving between each server at the bottom of the algorithm. It connects to other servers in the cluster and ensures that there is only one TCP connection between the same two servers.

ZooKeeper learning Notes (4) – In-depth understanding of group head election

How is data transmitted and sent between nodes

In discussing how the QuorumCnxManager manages network traffic with other nodes during an election, keep an eye on a few key objects.

QuorumCnxManager {
    // zkserver. sid -> mapping of the message sender
    final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
    // zkserver. sid -> mapping of message sending queues. This is set to ensure that messages are sent without cross - scrambling.
    final ConcurrentHashMap<Long, BlockingQueue<ByteBuffer>> queueSendMap;
    // zkserver. sid -> last sent data cache to sid. After a successful connection to another server, a message recorded in this map will be sent again to prevent the failure of sending the last message when the last connection is disconnected. The receiver will do a de-redo operation.
    final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
    // Data receiving queue, used to store data from other nodes
    public final BlockingQueue<Message> recvQueue;
}
Copy the code

QuorumCnxManager also has two internal classes: SendWorker and RecvWorker. Both of them are threads, and their main work is done in the run method. Using the above structure, we can easily define their working relationship:

  1. Build on a nodeSocketOnce connected, build oneSendWorker, aRecvWorkerRegister one for sending data and one for receiving datasid - sendQueuequeueSendMapIn the.
  2. Sending data is first posted toqueueSendMapIn the queue, bySendWorker.run()To the consumer. This is a typicalProduction consumer modelThat sends data asynchronously.
  3. RecvWorker.run()The system continuously listens for data on the TCP connection and delivers data directly torecvQueueFor use by other threads.

Now that we know how the nodes communicate within the connection, we need to know how the nodes establish the connection.

How do I establish connections between nodes

Compared to the C-S server we wrote, clusters have a characteristic that they do not have a C-S relationship, but are equal sets of nodes. Consider a problem: how to connect two tcp-sockets to a discrete set of nodes? There are two problems that need to be solved:

  1. Each node can initiate TCP connection requests and accept other TCP requests.
  2. The two requests must be equivalent (in this case, one a-B node initiates A request at the same time, and only one TCP connection can be reached).

To solve problem 1, QuorumCnxManager defines two threads.

  • QuorumConnectionReqThread. This thread accepts the destination address and then initiates the request.
  • QuorumConnectionReceiverThread. This thread is used to accept connection requests externally.

When both are done, a SID is finally registered for the maps in the previous section. At this point, the communication implementation between the QuorumCnxManager nodes is complete.

Fastleaderelection. Java group leader election implementation

With the above communication foundation, the election algorithm can work based on the above communication.

Let’s start with the interface and then take a look at some of FastLeaderElection’s key fields, internal classes, and methods.

public interface Election {
    Vote lookForLeader(a) throws InterruptedException;
    void shutdown(a);
}

public class FastLeaderElection implements Election {
    // Data send queue
    LinkedBlockingQueue<ToSend> sendqueue;
    // Data receive queueLinkedBlockingQueue<Notification> recvqueue; These two are just channels for messages, FastLeaderElection also defines two objects to serve the two queues:// The class receives messages from the queue and processes them, which is a bit more complex
    class WorkerReceiver extends ZooKeeperThread {... }// This is only responsible for queuing QuorumCnxManager messages
    class WorkerSender extends ZooKeeperThread {... }// Broadcast the current node's "vote" to all nodes
    sendNotifications(); 
}
Copy the code

The communication management of the ballot can be abstracted as follows:

ZooKeeper learning Notes (4) – In-depth understanding of group head election

Now we have all the basics of communication outside FastLeaderElection. It’s time to start talking about the most specific election algorithm: Vote lookForLeader().

    /** * Starts a new round of leader election. Whenever our QuorumPeer * changes its state to LOOKING, this method is invoked, and it * sends notifications to all other peers. */
    public Vote lookForLeader(a) throws InterruptedException {
        try {
            self.jmxLeaderElectionBean = new LeaderElectionBean();
            MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
        } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
            self.jmxLeaderElectionBean = null;
        }

        self.start_fle = Time.currentElapsedTime();
        try {
            /* * The votes from the current leader election are stored in recvset. In other words, a vote v is in recvset * if v.electionEpoch == logicalclock. The current participant uses recvset to deduce on whether a The majority of the participants has voted for it
            Map<Long, Vote> recvset = new HashMap<Long, Vote>();

            /* * The votes from previous leader elections, as well as the votes from the current leader election are * stored in outofelection. Note that notifications in a LOOKING state are not stored in outofelection. * Only FOLLOWING or LEADING notifications are stored in outofelection. The current participant could use * outofelection to learn which participant is the leader if it arrives late (i.e., higher logicalclock than * the electionEpoch of the received notifications) in a leader election. */
            Map<Long, Vote> outofelection = new HashMap<Long, Vote>();

            int notTimeout = minNotificationInterval;

            synchronized (this) {
                // The number of votes increases by 1
                logicalclock.incrementAndGet();
                // Initializes the ballot information and considers himself the Leader
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }

            LOG.info(
                "New election. My id = {}, proposed zxid=0x{}",
                self.getId(),
                Long.toHexString(proposedZxid));
            // Elect yourself as the Leader node for all nodes
            sendNotifications();

            SyncedLearnerTracker voteSet;

            /* * Loop in which we exchange notifications until we find a leader */

            while((self.getPeerState() == ServerState.LOOKING) && (! stop)) {// Loop waiting: the primary node is not selected, so loop here until the node is stopped
                /* * Remove next notification from queue, times out after 2 times * the termination time * * Get votes from recvqueue */
                Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);

                /* * Sends more notifications if haven't received enough. * Otherwise processes new notification. */
                if (n == null) {
                    if (manager.haveDelivered()) {
                        sendNotifications();
                    } else {
                        manager.connectAll();
                    }

                    /* * Exponential backoff */
                    int tmpTimeOut = notTimeout * 2;
                    notTimeout = Math.min(tmpTimeOut, maxNotificationInterval);
                    LOG.info("Notification time out: {}", notTimeout);
                } else if (validVoter(n.sid) && validVoter(n.leader)) {
                    // Determine whether the SID is valid, whether the voter is valid, and what the voter votes on. If the current node does not consider this to be valid, discard the n vote

                    /* * Only proceed if the vote comes from a replica in the current or next * voting view for a replica in the current or next voting view. */
                    switch (n.state) {
                    case LOOKING:
                        // LOOKING received the ballot
                        if (getInitLastLoggedZxid() == -1) {
                            LOG.debug("Ignoring notification as our zxid is -1");
                            break;
                        }
                        if (n.zxid == -1) {
                            LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
                            break;
                        }
                        // If notification > current, replace and send messages out
                        if (n.electionEpoch > logicalclock.get()) {
                            // If the current node is behind the number of votes, directly clean up their own votes.
                            // In the case of group head election, it is often selected several times. Due to network communication, it is not guaranteed that the electionEpoch of nodes is consistent

                            // Modify your own electionEpoch and empty the recvSet
                            logicalclock.set(n.electionEpoch);
                            recvset.clear();

                            // Specific election PK logic than term, than zxID, than sid
                            if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                // If the ticket is more "suitable" for the Leader, then switch its choice
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                            }
                            // The node needs to re-publish its voting data because it is invalid.
                            sendNotifications();
                        } else if (n.electionEpoch < logicalclock.get()) {
                                LOG.debug(
                                    "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}",
                                    Long.toHexString(n.electionEpoch),
                                    Long.toHexString(logicalclock.get()));
                                // An invalid vote was received and ignored
                            break;
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                            // If the ticket is more "suitable" for the Leader after receiving the current number of valid votes, it switches its choice
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                            // Broadcast to all other nodes
                            sendNotifications();
                        }

                        LOG.debug(
                            "Adding vote: from={}, proposed leader={}, proposed zxid=0x{}, proposed election epoch=0x{}",
                            n.sid,
                            n.leader,
                            Long.toHexString(n.zxid),
                            Long.toHexString(n.electionEpoch));

                        // don't care about the version if it's in LOOKING state
                        // Archive ballot data
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                        / / statistics
                        voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));

                        if (voteSet.hasAllQuorums()) {
                            // Check if there is more than half

                            // Verify if there is any change in the proposed leader
                            while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) ! =null) {
                                // Continue to collect data to ensure that my statistics are correct.
                                if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                                    recvqueue.put(n);
                                    break; }}/* * This predicate is true once we don't read any new * relevant message from the reception queue */
                            if (n == null) {
                                // No problem. Change the status of its own node
                                setPeerState(proposedLeader, voteSet);
                                Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                                leaveInstance(endVote);
                                return endVote; // Get the primary node}}break;
                    case OBSERVING:
                        LOG.debug("Notification from observer: {}", n.sid);
                        break;
                    case FOLLOWING:
                    case LEADING:
                        // LEADING Leader receives the vote.
                        /* * Consider all notifications from the same epoch * together. In this case, the node may temporarily suspend its existence and then resume. For example, super-long stop-the-world */
                        if (n.electionEpoch == logicalclock.get()) {
                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                            voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                            if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) {
                                setPeerState(n.leader, voteSet);
                                Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                                leaveInstance(endVote);
                                returnendVote; }}/* * Before joining an established ensemble, verify that * a majority are following the same leader. * * Note that the outofelection map also stores votes from the current leader election. * See ZOOKEEPER-1732 for more information. */
                        outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                        voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));

                        if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                            synchronized (this) {
                                logicalclock.set(n.electionEpoch);
                                setPeerState(n.leader, voteSet);
                            }
                            Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                        break;
                    default:
                        LOG.warn("Notification state unrecoginized: {} (n.state), {}(n.sid)", n.state, n.sid);
                        break; }}else {
                    if(! validVoter(n.leader)) { LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
                    }
                    if(! validVoter(n.sid)) { LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid); }}}return null;
        } finally {
            try {
                if(self.jmxLeaderElectionBean ! =null) { MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean); }}catch (Exception e) {
                LOG.warn("Failed to unregister with JMX", e);
            }
            self.jmxLeaderElectionBean = null;
            LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount()); }}Copy the code

The above code is relatively complex, and it will be better to view it with code comments. Some details are omitted in the flow chart, but it should not affect reading, as follows:

Question:

  1. Where did the Eposh and ZXID voters come from?

A: In the initialization of QuorumPeer, ZKDatabase is initialized. In this storage model, we can obtain the ZXID.

  1. So in the process of running, is the Epoch term and ZXID identical on most nodes? Would this result in the Leader election basically being selected as sid? If so, if sid = 1 on a host with poor stability, will the leader often choose it and the whole cluster will be unstable?
  2. The communication between nodes is mutual, that is to say, when there are n nodes, the number of TCP connections needs to reach N x N, and the load node communication and node election communication each account for one, that is, the number of cluster communication connections is N x N x 2, including the client connection, the number of TCP connections reached is very large. How does the ZooKeeper cluster handle similar problems? Or is it an inherent property of ZooKeeper clusters?

Afterword.

ZooKeeper’s code is still a bit strong, but not too readable. But ZooKeeper is a very powerful Swiss Army knife, short but powerful. Dubbo uses it as a registry, and Kafka relies on ZooKeeper. This Debug process also gives me a new understanding of the communication of peer node cluster, and the algorithm of group head election is also wonderful. However, it is still a long time for me to be comfortable with ZooKeeper.

In addition, there are many misunderstandings and unconsidered details about ZooKeeper in the article. I hope you can discuss them in depth together. If you find any mistakes, please correct them.

The above notes can be seen here to read and debug the ZooKeeper source code

reference

  1. ZooKeeper Distributed Process Collaboration In Detail. By Flavio Junqueira Benjamin Reed
  2. Summary of CAP theory
  3. ZooKeeper learning Notes (4) – In-depth understanding of group head election