ZooKeeper is a distributed coordination service maintained by Apache.

ZooKeeper can be considered a highly available file system.

ZooKeeper can be used for publish/subscribe, load balancing, command service, distributed coordination/notification, cluster management, Master election, distributed lock, and distributed queue.

1. ZooKeeper Overview

1.1 What is ZooKeeper

ZooKeeper is Apache’s top-level project. ZooKeeper provides efficient and reliable distributed coordination services for distributed applications, including basic distributed services such as unified naming service, configuration management, and distributed lock. ZooKeeper does not directly use Paxos algorithm to solve distributed data consistency, but uses a consistency protocol named ZAB.

ZooKeeper is used to solve the problem of application system consistency in distributed cluster. ZooKeeper provides data storage based on directory node tree similar to file system. However, ZooKeeper is not used to store data. It is mainly used to maintain and monitor the status changes of stored data. By monitoring these changes in data state, data-based cluster management can be achieved.

Many well-known frameworks are based on ZooKeeper to implement distributed high availability, such as Dubbo and Kafka.

1.2 ZooKeeper Features

ZooKeeper provides the following features:

支那

  • ** Sequential consistency: ** All clients see the same server-side data model; Transaction requests initiated from a client are eventually applied to ZooKeeper in the exact order in which they were initiated. The concrete implementation can be seen below: atomic broadcast.
  • ** Atomicity: ** The processing result of all transaction requests is applied consistently across all machines in the cluster, that is, the whole cluster either successfully applies a transaction or does not apply at all. The implementation is described below: transactions.
  • ** Single view: ** The client sees the same server-side data model no matter which Zookeeper server it connects to.
  • ** High performance: **ZooKeeper stores all data in memory, so its performance is very high. Note that: Because all updates and deletes of ZooKeeper are transaction-based, ZooKeeper performs better in application scenarios with more reads and fewer writes. If the write operations are frequent, ZooKeeper performance deteriorates.
  • ** High availability: ** The high availability of ZooKeeper is based on the replica mechanism. In addition, ZooKeeper supports failover.

支那

1.3 Design Objectives of ZooKeeper

  • Simple data model
  • You can build clusters
  • Sequential access
  • A high performance

Core concepts of ZooKeeper

2.1 Data Model

ZooKeeper’s data model is a tree-structured file system.

The nodes in the tree are called zNodes, where the root node is /, and each node stores its own data and node information. Znode can be used to store data and has an ACL associated with it (see ACL for details). ZooKeeper is designed to coordinate services and is not really stored as a file, so ZNode is limited to storing data within 1MB.

**ZooKeeper’s data access is atomic. ** All read and write operations succeed or fail.

Znode is referenced by path. The path of the Znode must be an absolute path.

Znodes come in two types:

支那

  • When the client session ends, ZooKeeper deletes the temporary ZNode.
  • ** ZooKeeper does not delete PERSISTENT ZNodes unless the client actively deletes them.

支那

2.2 Node Information

Znode has a SEQUENTIAL flag. If a ZNode is set to SEQUENTIAL when it is created, ZooKeeper uses a counter to add a monotonically increasing number, zxID, to the ZNode. ZooKeeper uses ZXID to implement strict sequential access control.

As each ZNode stores data, it maintains a data structure called Stat, which stores all state information about the node. As follows:

2.3 Cluster Roles

A Zookeeper cluster is a high availability cluster based on master/slave replication. Each server plays one of the following roles.

支那

  • **Leader: ** This is responsible for initiating and maintaining heartbeats to follwers and observers. All write operations must be performed by the Leader who broadcasts the write operations to other servers. A Zookeeper cluster has only one working Leader at a time.
  • **Follower: ** It responds to the Leader’s heartbeat. Followers directly process and return read requests to the client, forward the write requests to the Leader, and vote on them when the Leader processes the write requests. A Zookeeper cluster may have multiple followers.
  • **Observer: ** plays a similar role to followers, but does not have voting rights.

支那

2.4 the ACL

ZooKeeper uses Access Control Lists (ACLs) to Control permissions.

Each ZNode is created with an ACL list that determines who can perform what operations on it.

Acls depend on the client authentication mechanism of ZooKeeper. ZooKeeper provides the following authentication modes:

支那

  • Digest: identifies the client using the user name and password
  • ** SASL: ** Identifies clients via Kerberos
  • ** IP: ** Identifies clients by IP

支那

ZooKeeper defines the following five permissions:

支那

  • **CREATE: ** allows the creation of child nodes;
  • **READ: ** Allows fetching data from nodes and listing their children;
  • WRITE: Data can be set for a node.
  • **DELETE: ** allows child nodes to be deleted;
  • ADMIN: Allows you to set permissions for a node.

支那

Working principle of ZooKeeper

3.1 read operation

The Leader, Follower, or Observer can directly process read requests by reading data from the local memory and sending it back to the client.

Since servers do not need to interact with each other to process read requests, the more followers/Observers there are, the greater read request throughput and better read performance of the overall system.

3.2 write operation

All write requests are actually handed over to the Leader. The Leader sends the write request to all followers in the form of a transaction and waits for an ACK. Once the Leader receives more than half of the acks from followers, the write operation is considered successful.

3.2.1 write Leader

As can be seen from the figure above, write operations performed by Leader are mainly divided into five steps:

  1. The client sends a write request to the Leader.
  2. The Leader sends the write request to all followers in the form of a transaction Proposal and waits for an ACK.
  3. The Follower returns an ACK after receiving the transaction Proposal from the Leader.
  4. The Leader sends Commmit to all followers and observers after receiving a majority ACK (the Leader has an ACK by default for himself).
  5. The Leader returns the processing results to the client.

Pay attention to

  • The Leader does not need to receive an ACK from the Observer, that is, the Observer has no vote.
  • The Leader does not need to receive acks from all the followers, but only half of them. In addition, the Leader has an ACK for himself. Above four followers, only two of them returns an ACK can, because of the (2 + 1)/(4 + 1) > 1/2 (2 + 1)/(4 + 1) > 1/2 (2 + 1)/(4 + 1) > 1/2.
  • While the Observer does not vote, it must synchronize the Leader’s data so that it can return as new data as possible when processing read requests.

3.2.2 write Follower/Observer

The Follower/Observer accepts write requests but forwards them to the Leader instead of processing them directly.

Except for one more step of request forwarding, the other process is no different from writing directly to the Leader.

3.3 transactions

ZooKeeper has strict sequential access control for each update request from the client.

ZooKeeper uses an increasing transaction ID number (ZXID) to identify transactions to ensure consistency of order.

** The Leader service assigns a separate queue to each Follower server, queues transaction proposals in turn, and sends messages based on a FIFO(first-in, first-out) policy. After receiving the Proposal, the **Follower service writes it to the local disk as a transaction log and sends an Ack response to the Leader after the Proposal is successfully written. ** When the Leader receives more than half of the Ack responses from the followers, he broadcasts a Commit message to all the followers to inform them to Commit the transaction, after which the Leader himself will also complete the transaction Commit. Each Follower completes the transaction after receiving the Commit message.

All proposals had zxIDS added to them when they were put forward. Zxid is a 64-bit number whose high 32 bits are epoch used to identify whether the Leader relationship has changed. Each time a Leader is elected, it will have a new epoch, identifying the current reign period of the Leader. The lower 32 bits are used for incremental counting.

The detailed process is as follows:

  • The Leader waits for the Server to connect.
  • The followers connect to the Leader and send the maximum ZXID to the Leader.
  • The Leader determines the synchronization point based on the zxID of the Follower.
  • After the synchronization is complete, the follower becomes the uptodate state.
  • After receiving the uptoDate message, the Follower can accept the request from the client again.

3.4 observe

The client registers to listen to the ZNode it cares about, and when the ZNode status changes (data changes, child node changes), the ZooKeeper service will notify the client.

There are two ways for a client and a server to maintain a connection:

  • The client constantly polls the server
  • The server pushes status to the client

Zookeeper uses the active status push mechanism (Watch) of the server.

The observation mechanism of ZooKeeper allows users to register listeners for interested events on specified nodes. When an event occurs, the listener is triggered and the event information is pushed to the client.

When the client uses getData and other interfaces to get the zNode status, it passes in a callback to handle the node change. Then the server will actively push the node change to the client:

The Watcher object passed in from this method implements the corresponding Process method, and every time there is a state change on the corresponding node, WatchManager calls the method passed in by Watcher in the following way:

Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
    WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
    Set<Watcher> watchers;
    synchronized (this) {
        watchers = watchTable.remove(path);
    }
    for (Watcher w : watchers) {
        w.process(e);
    }
    return
Copy the code

All data in Zookeeper is actually managed by a data structure named DataTree. All read/write data requests will eventually change the content of the tree. Watcher may be passed to register a callback function when the read request is issued, while the write request may trigger the corresponding callback. WatchManager notifies clients of data changes.

The implementation of the notification mechanism is actually relatively simple. Watcher is set to listen for events through read requests, and the write request can send notifications to the specified client when the event is triggered.

3.5 session

The ZooKeeper client connects to the ZooKeeper service cluster over the TCP long-haul connection. A Session is established from the first connection, and then the heartbeat detection mechanism is used to maintain the valid Session state. Through this connection, clients can send requests and receive responses, as well as receive notifications of Watch events.

The ZooKeeper server cluster list is configured in each ZooKeeper client configuration. At startup, the client iterates through the list to try to establish a connection. If it fails, it tries to connect to the next server, and so on.

Once a client establishes a connection with a server, the server creates a new session for the client. ** Each session has a timeout period, and if the server does not receive any requests within the timeout period, the corresponding session is considered expired. ** Once a session expires, it cannot be reopened and any temporary ZNodes associated with that session are deleted.

In general, sessions should be long lived, and this needs to be guaranteed by the client. The client can ping the session to keep it alive.

ZooKeeper sessions have four properties:

  • **sessionID: **sessionID, which uniquely identifies a session. Each time a client creates a new session, Zookeeper assigns a globally unique sessionID to it.

  • * * the TimeOut: ** sessionTimeout period. When constructing a Zookeeper instance, the client configures the sessionTimeout parameter to specify the sessionTimeout period. After the Zookeeper client sends the sessionTimeout period to the server, the server determines the sessionTimeout period based on its own timeout period.

  • * * TickTime: ** Next session TimeOut point. To facilitate The Bucket policy management for Zookeeper sessions and efficiently and efficiently implement session TimeOut check and clearing, Zookeeper marks a next session TimeOut point for each session, whose value is approximately equal to the current time plus TimeOut.

  • **isClosing: ** Indicates whether a session has been closed. When the server detects that the session has expired, it marks the SESSION’s isClosing as “Closed”. This ensures that no new requests from the session are processed.

Zookeeper manages sessions through SessionTracker, which uses the bucket division policy (similar sessions are managed in the same block) to isolate sessions from different blocks and process sessions in the same block in a unified manner.

Iv. ZAB Agreement

Instead of using the Paxos algorithm directly, ZooKeeper uses a consistency protocol called ZAB. The ZAB protocol is not the Paxos algorithm, but rather similar, and the two are different in operation.

ZAB protocol is an atomic broadcast protocol specially designed by Zookeeper to support crash recovery.

ZAB is a data consistency and high availability solution for ZooKeeper.

The ZAB protocol defines two flows that can loop indefinitely:

  • ** Elects the Leader: ** is used for failover to ensure high availability.

  • ** Atomic broadcast: ** is used for master/slave synchronization to ensure data consistency.

4.1 Leader election

This section describes how to restore ZooKeeper

A ZooKeeper cluster uses the single-master (Leader) multi-slave (Follower) mode. The primary and secondary nodes use the copy mechanism to ensure data consistency.

  • If the Follower node fails – Each node in the ZooKeeper cluster maintains its own state in memory and communicates with each other. As long as half of the cluster’s machines are working properly, the cluster can provide services properly.

  • If the Leader node fails – If the Leader node fails, the system will not work properly. In this case, you need to use the ZAB protocol to elect a Leader for fault recovery.

In simple terms, the Leader election mechanism of ZAB protocol is: a new Leader is generated based on the half election mechanism, and then other machines will synchronize state from the new Leader. When half machines complete state synchronization, they quit the Leader election mode and enter the atomic broadcast mode.

4.4.1 term

** myID: ** Each Zookeeper server needs to create a file named myID under the data folder. This file contains the unique ID (integer) of the entire Zookeeper cluster.

** zxID: ** Proposal ID used to identify an update operation, similar to the transaction ID in an RDBMS. To ensure orderality, the ZKID must be monotonically incremented. Therefore, Zookeeper uses a 64-bit number to represent the Leader epoch. The higher 32 bits are the epoch of the Leader. Starting from 1, each time a new Leader is selected, the epoch increases by one. The lower 32 bits are the ordinal of the epoch. Each time the epoch changes, the lower 32 bits will be reset. This ensures the global incrementation of ZKID.

4.1.2 Server Status

  • **LOOKING: ** Not sure about the Leader status. The server in this state considers that no Leader exists in the cluster and initiates the Leader election.

  • **FOLLOWING: ** follow status. Indicates that the current server role is Follower and that it knows who the Leader is.

  • **LEADING: ** Leadership status. Indicates that the current server role is Leader and maintains the heartbeat between the server and followers.

  • OBSERVING: indicates the state of the observer. Indicates that the current server role is an Observer, the only difference from Folower is that it does not participate in elections or vote on cluster writes.

4.1.3 Ballot data structure

When each server elects a leader, it sends the following key information:

  • **logicClock: ** Each server maintains an incrementing integer called logicClock, which indicates the number of votes that the server initiated.

  • **state: ** Current server status.

  • **self_id: ** MyID of the current server.

  • ** self_zxID: ** The maximum zxID of the data stored on the current server.

  • **vote_id: ** MyID of the selected server.

  • ** vote_zxID: ** The maximum zxID of the data stored on the selected server.

4.1.4 Voting process

(1) Self-increasing electoral rounds

Zookeeper states that all valid votes must be in the same round. When each server starts a new round of voting, it increments the logicClock it maintains first.

(2) Initialize the ballot

Each server empties its own ballot box before broadcasting its own vote. The ballot box records the votes received. For example, if server 2 votes for server 3 and server 3 votes for server 1, the ballot box of server 1 is (2, 3), (3, 1), (1, 1). Only the last vote of each voter will be recorded in the ballot box. If the voter updates his vote, other servers will update the vote of the server in their ballot box after receiving the new vote.

(3) Send the initial ballot

Each server initially broadcasts a vote for itself.

(4) Accept external voting

The server will try to get the vote from another server and put it in its own poll box. If you cannot get any external votes, you verify that you have a valid connection to the other servers in the cluster. If so, send your vote again; If no, establish a connection immediately.

(5) Judge the election cycle

After receiving an external vote, it will first process it differently according to the logicClock contained in the vote information:

  • The external vote’s logicClock is greater than its own logicClock. Note The election round of the server is behind that of other servers. Immediately empty its ballot box and update its logicClock to the received logicClock. Then compare its previous vote with the received vote to determine whether to change its vote, and finally broadcast its vote again.

  • The external vote’s logicClock is smaller than its own logicClock. The current server simply ignores the vote and continues to process the next vote.

  • The logickClock of the external vote is equal to its own. Votes were cast at that time.

(6) Votes PK

The vote PK is based on the comparison between (self_id, self_zxID) and (vote_id, vote_zxid) :

  • If the logicClock of the external vote is greater than its own logicClock, change its own logicClock and its vote’s logicClock to the received logicClock.

  • If the logicClock is the same, then compare the vote_zxID of the two. If the vote_zxID of the external vote is large, Then the vote_zxID and vote_myID in their own tickets will be updated to the vote_zxID and vote_myID in the received tickets and broadcast out, in addition, the received tickets and their updated tickets into their own ticket box. If the same ballot (self_MYID, self_ZXID) already exists in the ballot box, it is overwritten directly.

  • If the vote_zxID of the two vote_zxID is the same, the vote_myID of the two vote_zxID is compared. If the vote_myID of the external vote is large, the vote_myID of the own vote is updated to the vote_myID of the received vote and broadcast. In addition, the received tickets and their updated tickets into their own ticket box.

(7) Counting the votes

If it is determined that more than half of the servers have approved their vote (possibly an updated vote), the vote is terminated. Otherwise continue to receive votes from other servers.

(8) Update the server status

After the vote stops, the server updates its status. If more than half of the votes are for me, update my server status to LEADING; otherwise, update my server status to FOLLOWING.

Based on the above process analysis, it is not difficult to see that the number of nodes in the ZooKeeper cluster must be odd in order for the Leader to obtain the support of most servers. The number of surviving nodes must not be less than N + 1.

This process is repeated after each Server is started. In recovery mode, if a server has just recovered from a crash or has just been started, data and session information will be recovered from disk snapshots. Zk records transaction logs and takes snapshots periodically to facilitate state recovery during recovery.

4.2 Atomic Broadcast

ZooKeeper implements high availability through the replica mechanism.

So how does ZooKeeper implement the replica mechanism? The answer: atomic broadcasting of ZAB protocol.

Atomic broadcast requirements of the ZAB protocol:

** All write requests are forwarded to the Leader, who notifies the Follow in an atomic broadcast. When more than half of the follows have been persisted, the Leader will commit the update and the client will receive a successful update response. ** This is somewhat similar to the two-phase commit protocol in databases.

In the whole message broadcast process, the Leader server will request each transaction to generate a corresponding Proposal, and assign it a globally unique increasing transaction ID(ZXID), and then broadcast it.

ZooKeeper application

ZooKeeper can be used for publish/subscribe, load balancing, command service, distributed coordination/notification, cluster management, Master election, distributed lock, and distributed queue.

5.1 Naming Service

In distributed systems, a globally unique name is usually required, for example, to generate a globally unique order number. ZooKeeper can generate a globally unique ID using the sequential node feature to provide naming services for distributed systems.

5.2 Configuration Management

Using ZooKeeper’s observation mechanism, it can be used as a highly available configuration store, allowing participants in distributed applications to retrieve and update configuration files.

5.3 Distributed Lock

Distributed locking can be implemented through ZooKeeper’s temporary nodes and Watcher mechanism.

For example, A distributed system has three nodes A, B, and C trying to obtain A distributed lock through ZooKeeper.

(1) Visit /lock (the directory path is determined by the program itself) and create the EPHEMERAL node with the serial number.

(2) When each node attempts to acquire a lock, it obtains all the children of the/LOCKS node (ID_0000, ID_0001, ID_0002) and determines whether the node it has created is the smallest.

  • If so, get the lock.

    Release lock: After this operation, delete the created node.

  • If it is not, it listens for changes in nodes 1 smaller than it is.

(3) Release the lock, that is, delete the node created by yourself.

In the figure, NodeA deletes the node ID_0000 created by NodeB, and NodeB detects the change and finds that its node has become the smallest node, so it can obtain the lock.

5.4 Cluster Management

ZooKeeper also solves problems in most distributed systems:

  • For example, you can create temporary nodes to establish a heartbeat detection mechanism. If a service node of the distributed system goes down, the session it holds times out, the temporary node is deleted, and the corresponding listening event is fired.

  • Each service node of the distributed system can also write its own node state to the temporary node to complete the status report or node work progress report.

  • ZooKeeper can also decouple modules and schedule tasks for distributed systems by subscribing and publishing data.

  • Through the monitoring mechanism, the service nodes of the distributed system can be dynamically connected up and down, so as to realize the dynamic expansion of services.

5.5 Electing a Leader Node

An important mode of distributed system is Master/Salves mode, and ZooKeeper can be used for Matser election in this mode. All service nodes can compete to create the same ZNode. ZooKeeper cannot have zNodes with the same path. Therefore, only one service node can be successfully created, and the service node can become the Master node.

5.6 Queue Management

ZooKeeper can handle two types of queues:

  • A queue is available only when its members are all together, otherwise it waits for all the members to arrive, which is a synchronous queue.

  • Queues are enqueued and enqueued in FIFO mode, such as implementing the producer and consumer model.

The implementation of synchronous queue with ZooKeeper is as follows:

Creating a parent directory /synchronizing/start, each member monitors the existence of the Set Watch bit directory /synchronizing/start, and each member joins this queue. This is done by creating a temporary directory node of the /synchronizing/ member_I directory, and then each member obtains all the directory nodes of the /synchronizing/ member_I directory. Synchronizing /start Determines whether the value of I is already the number of members. If it is smaller than the number of members, wait for /synchronizing/start to occur, and if it is equal, create /synchronizing/start.

The resources

The official

  • The ZooKeeper’s official website

  • Official document of ZooKeeper

  • ZooKeeper Github

books

  • The Definitive Guide to Hadoop (4th Edition)

  • Principles and Practices of Distributed Consistency from Paxos to Zookeeper

The article

  • Distributed services framework ZooKeeper – Manages data in a distributed environment

  • Functions and working principles of ZooKeeper

  • ZooKeeper overview and core concepts

  • Explain the distributed coordination service ZooKeeper

  • Zookeeper architecture and FastLeaderElection mechanism

Author: ZhangPeng