Zookeeper overview

Zookeeper a small distributed file system using the high fault tolerance data consistency protocol (CP). It provides data storage in the form of directories similar to file systems.

  1. Global data consistency: Each server stores the same data copy, and the data displayed by the client is the same regardless of which server the client connects to.
  2. Reliability: Once a transaction is successfully committed, it is retained.
  3. Order: Transaction requests initiated by the client are also sequentially applied to Zookeeper.
  4. Data update atomicity: a data update either succeeds or fails, and there is no intermediate state.
  5. Real-time: To ensure that the client can obtain service update messages or server failure information within an interval time range.

They are single

Data model:

Each node is a ZNode (both a file and a directory), and each Znode has atomic operations. Znode stores a limited amount of data (1MB by default) that is referenced via an absolute path. Znode is divided into three parts:

  • Stat: Indicates the status information, including the version and permission of a ZNode.
  • Data: data associated with the Znode.
  • Children: indicates the children of the Znode.

Znode Node type

  1. Temporary node: The lifetime of this node depends on the session that created it, and the temporary node is removed once the session ends. Temporary nodes are not allowed to have children.
  2. Permanent node: Can be deleted only through client display.
  3. Temporarily serialize the node.
  4. Permanently serialize the node.

Znode serialization: The name of a Znode is appended with an increasing serial number. Each serial number is unique to the parent node and records the order of each child node.

Znode node attributes

Zookeeper node properties include node data, status, and permissions.

attribute instructions
cZxid Znode creates the transaction ID of the node, and each change in Zookeeper produces a globally unique ZXID. It allows you to determine the sequence of update operations
ctime Creation time
mZxid Example Change the transaction ID of a node
mtime Last Modified time
pZxid The transaction ID of a child node change. Adding or deleting a child node affects the child node list, but modifying the data content of the child node does not affect this ID
cversion Version number of a child node. The version number of a child node increases by 1 each time it is modified
dataversion If multiple clients update the same ZNode, the sequence of update operations can be ensured only by the data version number. For example: Client A is updating the ZNode. If another client B updates the ZNode at the same time, the version number of CLIENT A has expired. In this case, client A fails to call setData.
aclversion The version number of the permission is increased by 1 each time the permission is changed
dataLength The data length of this node
numChildern The number of children that this node has

Znode ACL permission control

ACL permission control is identified by schema: ID: Permission. Example: setAcl /test2 IP :128.0.0.1:crwda

Schema

Schema enumeration values instructions
world Set by username, id is a user, but that ID has only one value: anyone, for everyone
ip An IP address is used for authentication. The ID corresponds to an IP address or IP address segment
auth Authenticate with an authenticated user to add an authorized user in the current context through addauth Digest User: PWD
digest User Name: Password is used for authentication

permission

permissions The ACL shorthand describe
CREATE c You can create child nodes
DELETE d You can delete child nodes (only next-level nodes)
READ r Can read node data and display child node list
WRITE w You can set node data
ADMIN a You can set access control list (ACL) permissions for nodes

Permission related commands

The command use describe
getAcl getAcl Reading ACL Permission
setAcl setAcl Setting ACL Permission
addauth addauth Adding an Authentication User

Zookeeper: Provides the distributed publish/subscribe function, which enables multiple subscribers to listen on a topic object at the same time. This function is implemented using the Watche mechanism.

They watch mechanism

A Watch event is a one-time trigger. When the data of the Watch is changed, the server sends the change to the clients with the Watch to notify them.

  • The Watcher event is triggered when a parent node is created, modified, or deleted.
  • Watcher events are triggered when a child node is created or deleted.

The listener Watch feature

features instructions
A one-time Watcher is a one-time use that is removed once triggered and needs to be re-registered when used again. Listening clients In many cases, all clients have to be notified of every change, putting a lot of pressure on the network and the server. One time relieves that pressure
Client sequential callback The process of the client Watcher callback is a serial synchronization process.
lightweight Watcher notifications are very simple and tell the client that an event has occurred without specifying what the event is.

Principle of listener

  1. The Zookeeper client will be created in the main thread. In this case, two threads will be created, one for network connection communication (Connet) and one for listener.
  2. The registered listening events are sent to the Zookeeper server through the Connect thread.
  3. Add the registered listening events to the List of registered listeners on the Zookeeper server.
  4. Zookeeper sends this message when it detects data or path changes

To the listener thread.

  1. The listener thread invokes the process() method internally to trigger Watcher.

Zookeeper session management

  1. The client will send ping messages to the ZkServer from time to time. When ZkServer receives ping messages or any other messages, the client session_ID and session_timeout will be recorded in a map.
  2. The Leader ZkServer periodically sends heartbeat messages to all followers. After receiving the ping message, the followers return the ping message to the Leader and empty the local map of the followers. The Leader uses this information to recalculate the client timeout.
  3. Once the time of session timout expires, the leader neither collects the session information of the client from other followers nor receives any requests from the client directly, then the session of the client will be closed.

Zookeeper data model

  1. The ZK maintains the following data: session status and dataNode information of clients.
  2. Zk constructs a DataTree data structure in memory and maintains the mapping between path and dataNode and the hierarchical tree relationship between datanodes. To improve read performance, each service node in the cluster stores full data in memory. Therefore, ZK is best suited for lightweight data scenarios that read a lot and write a little.

3. It is not safe to store data only in memory. Zk uses transaction log files and snapshot files to drop disk data to ensure that data can be quickly recovered without loss.

Zookeeper cluster

The cluster character

  1. Leader: the core of cluster work, the only scheduler and handler of transaction requests, ensuring the sequence of transaction processing. All write requests are forwarded to the Leader for processing. The Leader determines the number to perform the operation.
  2. Follower: processes non-transaction requests from the client, forwards the transaction requests to the Leader, and participates in the Leader election.
  3. Observer: Independently processes non-transaction requests. For transaction requests, the Leader server forwards the requests to the Leader server for processing. Don’t vote.

The transaction

  1. Transactions: In ZooKeeper, the operations that change the status of the ZooKeeper server are called transactions. The operations include creating and deleting data nodes, updating data content, and creating and invalidating client sessions. For each transaction request, ZooKeeper assigns a globally unique transaction ID, represented by an ZXID, usually a 64-bit number. Each ZXID corresponds to an update operation. From these ZXids, you can indirectly identify the global order in which ZooKeeper processes these transaction operation requests.
  2. Transaction log: All transaction operations need to be recorded in the log file. You can configure the file directory through dataLogDir. The file is suffixed with the zxID of the first transaction written to facilitate subsequent locating and searching. Zk adopts the disk space pre-allocation policy to avoid disk Seek frequency and improve the zK server’s ability to affect transaction requests. By default, each transaction log write is flushed to disk in real time, or it can be set to non-real time (write to memory file stream, timed batch write to disk), but that carries the risk of data loss if the power fails. After a certain number of transactions have been logged, the in-memory database is serialized once and persisted to disk. The serialized file is called “snapshot file”. With transaction logs and snapshots, you can restore any node to any point in time

  1. Data snapshot: Data snapshot is another very core operation mechanism in ZK data store. A data snapshot is used to record the full memory data content ata point in time on the ZK server and write it to a specified disk file, available through the dataDir configuration file directory. The parameter snapCount can be configured to set the number of transaction operations between two snapshots. When the ZK node finishes recording transaction logs, it determines whether to create a snapshot. If the number of transaction operations since the last snapshot is equal to a value in [snapCount/2 to snapCount], the snapshot generation operation is triggered. Random value is used to avoid slow cluster impact caused by snapshots generated on all nodes at the same time.

More than half the principle

1. Above half: Above half means that the number of machines in the cluster is greater than or equal to (N /2+1). The number of machines in the cluster does not include observer nodes. After the leader broadcasts a transaction message and receives more than half of the ACK messages, it considers that all nodes in the cluster have received the message. Then, the leader does not need to wait for acks from the remaining nodes, but directly broadcasts the COMMIT message and commits the transaction. Zookeeper consists of 2n servers and 1 server.

  1. Two-phase commit of ZooKeeper: In ZooKeeper, the client randomly connects to a node in the ZooKeeper cluster, and reads data from the current node in the case of a read request. If it is a write request, the request is forwarded to the Leader to commit the transaction, which then broadcasts the transaction, and the write request is committed as long as more than half of the nodes write successfully.

    1. The Leader converts the write request into a Proposal and distributes it to all the Follower nodes in the cluster.
    2. The Leader waits for the feedback from all the followers. Once more than half of the followers give correct feedback, the Leader sends a Commit message to all the followers, asking them to submit the previous Proposal node.
    3. The leader node synchronizes the latest data to the Observer node.
    4. Returns the result of the client execution.

ZAB agreement

ZooKeeper ensures data consistency through ZAB message broadcast, crash recovery, and data synchronization.

News broadcast

  1. After a transaction request comes in, the Leader wraps the write request as a Proposal transaction and adds a globally unique 64-bit incrementing transaction ID, Zxid.
  2. The Leader node broadcasts the Proposal transaction to other nodes in the cluster. The Leader node and the followers node are decoupled. The communication will pass through a FIFO message queue, and the Leader will allocate a separate FIFO queue to each Follower node. The Proposal is then sent to the queue.
  3. After receiving the Proposal, the Follower node persists it on the disk and sends an ACK to the Leader.
  4. When the Leader node receives more than half of the Follower nodes’ ACKS, it commits the transaction on the local machine and starts broadcasting the COMMIT. After the Follower nodes receive the COMMIT, they complete their own transaction commits.

Message broadcast is similar to the two-phase commit pattern of a distributed transaction. In this mode, data inconsistencies caused by node downtime after the Leader initiates a transaction request cannot be handled. So ZAB protocol introduced crash recovery mechanism.

Crash recovery

When the whole cluster is started or the Leader is disconnected, ZAB enters the recovery mode. The recovery mode flow is as follows:

  1. The cluster selects a new Leader through the democratic voting mechanism, and the era number is + 1 to start a new era
  2. Other nodes synchronize state from the new Leader
  3. Status synchronization is complete for half of the nodes. The nodes exit the recovery mode and enter the message broadcast mode

Leader Election Process

Server Operating Status

state instructions
LOOKING When the server is in this state, it considers that there is no Leader in the cluster and therefore needs to enter the Leader election state.
FOLLOWING Follower state. The current server role is Follower. It is responsible for synchronizing state from the Leader and participating in election voting.
LEADING Leader status. Indicates that the current server role is Leader.
OBSERVING Observer status: indicates that the current server role is Observer. It is responsible for synchronizing the leader status from the server and does not participate in the vote.

Voting principle

  1. Voting must be conducted in the same round. If followers serve different election rounds, voting will not be accepted.
  2. The node with the latest data has priority to become the Leader. The old and new data are determined by the transaction ID. The larger the transaction ID is, the node data is considered to be close to the data of the Leader, so it naturally becomes the Leader.
  3. If the transaction IDS of each participating node are the same, then use server. ID for comparison. Server. id is the unique id of the node in the cluster, as specified in the myID file.

The election stage

The message exchanged between clusters is called a Vote, and the information on the Vote consists of two dimensions: ID and GLEASID’s server ID of the receiveer on the receiveer’s transaction ID, which is obtained from the machine’s DataTree memory to ensure that the transaction has been committed on the machine.

There are three main threads working in the main selection process

  1. Election thread: a thread that actively calls the lookForLeader method and collaborates with the other two threads through the blocking queue sendQueue and recvQueue.
  2. WorkerReceiver Thread: A poll receiver that continuously retrieves election messages from other servers and saves them to the RecvQueue after filtering. When the ZK server starts, it starts working properly and does not stop
  3. WorkerSender thread: A ballot sender that continuously retrieves pending votes from the SendQueue queue and broadcasts them to the cluster.
  4. The WorkerReceiver thread works all the time, even if the current node is in the LEADING or FOLLOWING state. It acts as a filter. Only when the current node is LOOKING, will the external voting information be transferred to the election thread for processing.
  5. If the current node is not LOOKING and receives the voting data of the node in the LOOKING state (in the case of external node restart or network jitter), the data of the node that initiated the Vote is inconsistent with that of the cluster. In this case, the current node needs to broadcast the latest memory Vote(id, zxid) to the cluster. After receiving the Vote, the lagging node registers with the leader in a timely manner, completes data synchronization, keeps up with the cluster rhythm, and provides normal services.

New Cluster Election

  1. Each machine gives itself a vote.
  2. The value of the primary server ID. The greater the value, the greater the electoral weight.
  3. With half the votes counted, the election is over.

Non-new cluster election

  1. Logical clock: Election results with a small logical clock are ignored
  2. Data ID: The larger data ID wins
  3. Service ID: The server with the same data ID wins and is elected as the leader.

The electoral process is explained in detail

Leader election is the prerequisite for the normal operation of a cluster. When a cluster is started or the Leader is disconnected, the Leader election process enters.

  1. All nodes enter the LOOKING state
  2. Each node broadcasts a vote with its own ID and ZXID to elect itself as the Leader
  3. The node receives the ballot sent by other nodes and PK the ballot information with the ballot chosen by itself (in the ballot, the one with the highest ZXID wins, if the ZXID is the same, the one with the highest ID wins)
  4. If an external vote wins, the information is saved and the vote is broadcast (in favor of the vote)
  5. Repeat steps 3-4 above
  6. When a ballot is approved by more than half of the nodes and the owner of the ballot is also in favor of the ballot, the election is successful and the owner of the ballot becomes the Leader
  7. The Leader switch to LEADING, the Follower switch to FOLLOWING, and the Observer switch to OBSERVING. The election ends and the data synchronization process starts.

Data Synchronization Process

The data synchronization process is to make the cluster data reach the consistent state based on the Leader data.

  1. The new Leader loads the local snapshot into memory and applies all subsequent transactions through logging to ensure that the Leader database is up to date.
  2. The followers and Observers compare their zxids with the Leader’s zxids to determine the synchronization policy for each node
  3. According to the synchronization strategy, the Leader synchronizes data to each node
  4. After the synchronization of each node is complete, the Leader sends the NEWLEADER instruction to the node
  5. The Follower node returns an ACK after the synchronization is complete
  6. When the Leader receives an ACK from half of the nodes, the synchronization is considered complete

The Leader sends the UPTODATE command to the Follower node to notify the cluster that the synchronization is complete and start external services.

Example of ZK application

  1. Naming service: Using the naming service, client applications can obtain information such as the address and provider of a resource or service by a specified name. By creating a globally unique path as a name.
  2. Distributed lock: exclusive lock that requires all applications to create a temporary non-serialized node in the specified directory of the ZK cluster before acquiring data. Whoever creates the lock successfully obtains the lock and disconnects the node when the operation is complete. Other applications can listen for the directory’s existence if they need to manipulate the file.
  3. Control timing: Control timing by creating a temporary serialization node.
  4. Heartbeat detection: Let different processes create temporary child nodes under a specified node of the ZK. Different processes can directly determine whether the corresponding process is alive according to the temporary child node. System coupling is greatly reduced.
  5. Master election: for each client request to create a temporary node, only one client request must be successfully created. With this feature, it is easy to do Master elections in a distributed environment. The machine on which the client successfully created the node becomes the Master. Meanwhile, any client that fails to create this node will register a Watcher of child node changes on this node to monitor whether the current Master machine is alive. If the current Master machine is found to have died, the other clients will re-elect the Master.

Zookeeper faults:

1. Non-high availability: IN extreme cases, ZK discards some requests: the connection between equipment rooms is faulty.

  1. The ZooKeeper master can only take care of one equipment room. Service modules running in other equipment rooms can only be stopped because there is no master. Therefore, the ZooKeeper master is sensitive to network jitter.
  2. The election process was slow and zK could not provide external services during the election.
  3. The performance of ZK is limited: the TPS of a typical ZooKeeper is around 10,000, which cannot cover the billions of calls made within the system every day. Therefore, it is impossible to obtain the master information of the service system from ZooKeeper for each request. Therefore, the ZooKeeper client must cache the master address of the service system.
  4. Zk itself has very weak permission controls.
  5. Herding: all clients try to lock a temporary node, and when a lock is occupied, the other clients listen for the temporary node. Once the lock is released, Zookeeper sends a reverse notification to the listening client. Then, a large number of clients attempt to create a lock on the same temporary node, and only one client can obtain the lock. However, a large number of requests cause high network overhead, increasing network load, and affecting Zookeeper performance.

 * : the solution is to get the lock node creates a temporary order, order the youngest to acquire the lock, then try to lock the client to monitor their order on a node, when the lock is released, a sequential node locked by yourself, the rest of the client are listening to a temporary order node, No herd behavior is caused by trying to lock the same node.

  1. Zk reads data, and the data may be old and expired, not the latest data. If a ZK cluster has 10000 nodes, the ZK considers the write request successful if 6K nodes have been successfully written. However, if a client is reading data from another 4K nodes, it is reading old stale data.

They split brain:

Suspended animation: The leader is presumed dead due to a heartbeat timeout (network cause), but the leader is still alive.

Split brain

As a new leader election is initiated by feigning death, a new leader is elected. However, the old leader network is connected again, resulting in two leaders. Some clients are connected to the old leader, while some clients are connected to the new leader.

Quorum mechanism solves split brain

Zookeeper Quorums have three functions:

  1. The minimum number of nodes in a cluster is used to elect a leader to ensure the availability of the cluster.
  2. The minimum number of nodes in the cluster that have saved the data before notifying the client that the data has been safely saved. Once these nodes have saved the data, the client is notified that it is safely saved and can continue with other tasks. The remaining nodes in the cluster will eventually save the data as well.
  3. Suppose one leader faked his death, and the rest of the followers elected a new leader. If the old leader is revived and still thinks he is the leader, his request to the other followers will be rejected. Because every time a new leader is created, an epoch label will be generated (indicating that the current leader belongs to the ruling period), the epoch will be incremented. If the followers confirm the existence of the new leader and know the epoch, All requests whose epoch is smaller than the current leader epoch are rejected. Are there followers who do not know that a new leader exists? Perhaps, but certainly not the majority, otherwise a new leader would not be created. Zookeeper writes also follow the quorum mechanism. Therefore, writes without the majority of support are invalid, and the old leader is still of no effect even though various leaders consider themselves to be the leader.