The way of Java programming progress a little bit every day, precipitation technology to share knowledge.

This section describes the distributed lock mechanism of ZooKeeper

Then, we continue to talk about the principle of distributed locking of Zookeeper. Finally, we briefly talk about the concept of split brain and how Zookeeper deals with it.

A distributed lock

In order to prevent multiple processes from interfering with each other in a distributed system, we need a distributed coordination technique to schedule these processes. The core of this distributed coordination technology is to achieve this distributed lock.

I. Implementation principle

There are two ways to implement distributed lock in Zookeeper, one is to use temporary node, the other is to use temporary ordered node. What’s the difference between these two approaches? Which one should we use?

Temporary node

The principle of the temporary node scheme is as follows:

  • Let multiple processes (or threads) create competitivelyThe sameTemporary nodes, which must be created by only one process;
  • Assuming that process 1 successfully creates the node, it acquires the distributed lock. Other processes need to register a listener on parent_node to listen for changes on all its children and suspend the current thread.
  • When a child node under parent_node changes, it notifies all processes that have registered listeners on it. These processes need to determine if it is a delete event on the corresponding lock node. If so, let the suspended thread continue and try to acquire the lock again.

The reason for using a temporary node is to avoid deadlocks: process 1 actively deletes the node to release the lock after executing the business logic normally. But if process 1 unexpectedly goes down, since it is declared as a temporary node, that node is also removed to avoid deadlocks.

The implementation of temporary node scheme is relatively simple, but its disadvantages are also obvious:

  • One disadvantageProcesses 2, 3, and 4 are also notified when other locks under Parent_node change or are deleted, but apparently they do not care about other locks being released. If parent_Node has a large number of locks (a large number of processes waiting to be locked) and the application is in a high concurrency state, the ZooKeeper cluster will need to notify the client frequently, which can cause a large number ofNetwork overhead;
  • Defect 2: The lock created using the temporary node scheme isNot fair, that is, after process A releases the lock, the order in which processes 2, 3, and 4 initiate retries is related to the time they received the notification and not to the time when they first attempted to acquire the lock, that is, the length of the wait.

There is also a proper term ** “shock group” **

Jing group

It’s actually quite easy to understand the concept of a stampede. Suppose that when you throw a piece of food in the middle of a flock of pigeons, eventually only one dove will get the food, but all the pigeons will be disturbed and fight over it. The ones that don’t get the food have to go back to sleep and wait for the next one to arrive. In this way, every throw a piece of food, will disturb all the pigeons, that is, the panic group. For an operating system, multiple processes/threads waiting for the same resource have a similar effect, with the result that all processes/threads compete for the resource whenever it becomes available.

Panic groups can cause the following problems:

  1. Huge server performance drain
  2. An outage may occur
  3. Huge network impact

So when the program concurrency is not high, we can use the temporary node scheme to achieve, is relatively simple. But how do we deal with stampedes if the number of concurrent applications is high? We use the temporary ordered node scheme!

Temporary ordered node

When temporary ordered nodes are adopted, the corresponding flow is as follows:

  • Each process (or thread) will attempt to create under parent_nodeThe temporary ordernode
  • Each process then needs to get information about all temporary nodes of the lock under the current parent_node and determine if it isThe smallest node, if so, the lock is obtained;
  • If not, the current thread is suspended. And the one before itNode Registration Listening(Watch);
  • As shown in the figure above, when process 1 finishes processing, the Watch event registered by process 2 will be triggered. At this time, process 2 will know that it has obtained the lock and can start business processing.

From the introduction above, it can be seen that:

  • Each temporary ordered node only needs to care about itsPrevious nodeWithout having to worry about additional nodes and events;
  • The implemented lock isfair, the smaller the value of the temporary ordered node created by the first process, and therefore the faster the lock is acquired.

Another advantage of the temporary ordered node scheme is its ability to implement shared locks, such as read locks in read/write locks.

Read-write lock

As shown in the figure below, temporary ordered nodes can be divided into read lock nodes and write lock nodes:

  • For the read lock node, it only needs to care about the release of the previous write lock node. If the previous write lock is released, threads corresponding to multiple read lock nodes can concurrently read data.
  • For a write lock node, it only needs to care about the release of the previous node, and does not need to care about whether the previous node is a write lock node or a read lock node. To ensure order, the write operation must wait for the previous read or write operation to complete.

Second, the Apache Curator

2.1 Basic Usage

Apache Curator is a Java client of ZooKeeper, which implements distributed lock and distributed read/write lock based on temporary ordered node scheme

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.3.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.3.0</version>
</dependency>
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.14</version>
</dependency>
Copy the code

The basic usage is as follows:

RetryPolicy retryPolicy = new RetryNTimes(3.5000);
CuratorFramework client = CuratorFrameworkFactory.builder()
    .connectString("192.168.0.105:2181")
    .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
    .namespace("mySpace").build();
client.start();

1. Create a distributed lock
InterProcessMutex lock = new InterProcessMutex(client, "/distributed/myLock");
// 2. Try to obtain the distributed lock
if (lock.acquire(10, TimeUnit.SECONDS)) {
    try {
        System.out.println("Simulated business time");
        Thread.sleep(3 * 1000);
    } finally {
        // 3. Release locklock.release(); } } client.close(); Copy the codeCopy the code

After that, you can start multiple application processes for testing. The data structure on ZooKeeper is as follows:

In our specified path, multiple temporary ordered nodes are created in sequence and removed when the business logic is finished processing them. Here, we use the stand-alone version of ZooKeeper, which is the same in the cluster environment. Unlike the delayed replication in the master-slave mode of Redis, which leads to inconsistent data, data consistency on each node of the ZooKeeper cluster can be guaranteed by itself.

2.2 Source Code Parsing

At the bottom, Apache Curator uses a temporary ordered node implementation scheme. Here is how it is implemented in the source code:

1. Obtain lock source code analysis

The core method above is acquire(), which is defined as follows:

@Override
public boolean acquire(long time, TimeUnit unit) throws Exception{
    return internalLock(time, unit);
}
Copy the code

It calls internalLock internally.

// threadData is a thread-safe Map, where Thread is the Thread holding the lock and LockData is the LockData
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap(); 

private boolean internalLock(long time, TimeUnit unit) throws Exception{
     Thread currentThread = Thread.currentThread();
    // First check if threadData already has a lock for the current thread
     LockData lockData = threadData.get(currentThread);
     if( lockData ! =null) {// If the lock already exists, increment its counter by 1. This step is for reentrant locking
         lockData.lockCount.incrementAndGet();
         return true;
     }
    // [core method: try to acquire lock]
     String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
    // If a lock is acquired, it is added to threadData
     if( lockPath ! =null ){
         LockData newLockData = new LockData(currentThread, lockPath);
         threadData.put(currentThread, newLockData);
         return true;
     }
     return false;
 }
Copy the code

AttemptLock () : attemptLock()

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception{
        final long      startMillis = System.currentTimeMillis();
        finalLong millisToWait = (unit ! =null)? unit.toMillis(time) :null;
        final byte[] localLockNodeBytes = (revocable.get() ! =null)?new byte[0] : lockNodeBytes;
        int             retryCount = 0;  // Number of retries
        String          ourPath = null;
        boolean         hasTheLock = false;
        boolean         isDone = false;
    
        // Rely on this loop to retry when NoNodeException occurs
        while ( !isDone ){
            isDone = true;
            try{
                // Create a node based on the lock path.
                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                // [core method: get lock]
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            }
            catch ( KeeperException.NoNodeException e ){
                // If an exception occurs and the maximum retry time or number of retries configured for ZooKeeper has not been reached, the loop continues and another attempt is made to obtain the lock
                if ( client.getZookeeperClient().getRetryPolicy()
                    .allowRetry(retryCount++,System.currentTimeMillis() - startMillis,
                                RetryLoop.getDefaultRetrySleeper()) ){
                    isDone = false;
                }else{
                    throwe; }}}// If a lock is obtained, the loop is broken out and the lock path is returned
        if ( hasTheLock ){
            return ourPath;
        }
        return null;
    }
Copy the code

The two core methods here are createsTheLock() and internalLockLoop().

@Override
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception{
    String ourPath;
    // if lockNodeBytes are not empty, create a temporary ordered node with data
    if( lockNodeBytes ! =null ){
        ourPath = client.create().creatingParentContainersIfNeeded().withProtection().
            withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
    }else{
        // Otherwise an empty temporary ordered node is created
        ourPath = client.create().creatingParentContainersIfNeeded().withProtection().
            withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
    }
    // Returns the created node path
    return ourPath;
}
Copy the code

The path of the temporary ordered node returned here is passed as an argument to internalLockLoop(). After each thread has created a temporary ordered node, it also needs to determine whether the temporary ordered node it has created is the smallest node currently:

private boolean internalLockLoop ( long startMillis, Long millisToWait, String ourPath) throws Exception {
    // Whether to hold the lock
    boolean haveTheLock = false;
    boolean doDelete = false;
    try {
        if(revocable.get() ! =null) {
            client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
        }
        // If the ZooKeeper client is started, that is, the process that wants to acquire the lock is still running and has not acquired the lock, the loop continues
        while((client.getState() == CuratorFrameworkState.STARTED) && ! haveTheLock) {// Sort all current child nodes from smallest to largest
            List<String> children = getSortedChildren();
            // Intercepts the path of the temporary ordered node obtained by createsTheLock, keeping only the part of the node name
            String sequenceNodeName = ourPath.substring(basePath.length() + 1);
            // Determine whether the current node is the smallest
            PredicateResults predicateResults = driver.
                getsTheLock(client, children, sequenceNodeName, maxLeases);
            // If the current node is the smallest (exclusive lock case), then the lock is obtained
            if (predicateResults.getsTheLock()) {
                haveTheLock = true;
            } else {
                // If the current node is not the smallest, concatenate and get the full path of the previous node
                String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
                synchronized (this) {
                    try {
                        // Then listen on the previous node
                        client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                        // If the wait time is set
                        if(millisToWait ! =null) {
                            // Subtract the wait time from the time taken so far
                            millisToWait -= (System.currentTimeMillis() - startMillis);
                            startMillis = System.currentTimeMillis();
                            // If the wait time is less than 0, the lock is invalid and needs to be deleted
                            if (millisToWait <= 0) {
                                // Set the delete flag bit and exit the loop
                                doDelete = true; 
                                break;
                            }
                            // If there is time left, wait for the lock to be acquired for the remaining time
                            wait(millisToWait);
                        } else {
                            // If no wait time is set, the lock is kept waitingwait(); }}catch (KeeperException.NoNodeException e) {
                        // This exception is raised to indicate that the previous node does not exist (is freed) when the previous node is set to listen.
                        // There is no need to do anything extra because the loop continues and you can try again to acquire the lock
                    }
                }
            }
        }
    } catch (Exception e) {
        ThreadUtils.checkInterrupted(e);
        doDelete = true;
        throw e;
    } finally {
        // If an exception or timeout is thrown, the locks created by the process are invalid and need to be deleted. So that subsequent processes can continue trying to create locks
        if(doDelete) { deleteOurPath(ourPath); }}return haveTheLock;
}
Copy the code

GetsTheLock = getsTheLock = getsTheLock = getsTheLock

PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
Copy the code

As mentioned above, the determination of whether the current node is a lock node is different for different lock types (such as read/write locks and mutex locks), so the getsTheLock method has different implementations. Here in StandardLockInternalsDriver, for example, it USES the mutex judgment rule: that is, as long as the current node is the smallest one node, can hold locks:

 public PredicateResults getsTheLock(CuratorFramework client, List<String> children, 
                                     String sequenceNodeName, int maxLeases) throws Exception {
         // Get the index of the current node in the sorted node
        int ourIndex = children.indexOf(sequenceNodeName);
         // Throw NoNodeException if ourIndex is less than 0
        validateOurIndex(sequenceNodeName, ourIndex);
         // If ourIndex is less than maxLeases(the default value is 1) then it is 0, the first and smallest in the ordered set
        boolean getsTheLock = ourIndex < maxLeases;
         // If the lock is the smallest, the lock has already been acquired, so you do not need to return the name of the previous node. Otherwise, you need to return the name of the previous node for subsequent listening operations
        String  pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
        return new PredicateResults(pathToWatch, getsTheLock);
    }
Copy the code

MaxLease is a mutex with a default value of 1. If the default value is greater than 1, and maxLease is assumed to be 5, then the five smallest temporary ordered nodes can be considered lock holders. At this point, up to five threads can concurrently access the critical region, similar in function to the Semaphore mechanism in Java.

2 release lock source code analysis

The above is all the source code to obtain the lock analysis, and the release of the lock process is relatively simple. The source code for the release() method is as follows:

public void release(a) throws Exception {
    Thread currentThread = Thread.currentThread();
    // Get lock information based on the current thread
    InterProcessMutex.LockData lockData = threadData.get(currentThread);
    // If not, the current thread is not the holder of the lock, and an exception is raised
    if (lockData == null) {
        throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
    }
    // Because Zookeeper implements reentrant locks, it reduces its counter by 1
    int newLockCount = lockData.lockCount.decrementAndGet();
    if (newLockCount > 0) {
        return;
    }
    // If the counter value is less than 0, the number of unlocked times is greater than the number of locked times, and an exception is thrown
    if (newLockCount < 0) {
        throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
    }
    try {
        // If this step is reached, the value of the counter is equal to 0, and the node can be deleted
        internals.releaseLock(lockData.lockPath);
    } finally {
        // Remove lock information from threadDatathreadData.remove(currentThread); }}Copy the code

The actual method to remove the lock exists in releaseLock(), and the source code is as follows:

final void releaseLock(String lockPath) throws Exception{
      client.removeWatchers();
      revocable.set(null);
      deleteOurPath(lockPath); // Delete the ZooKeeper node
}
Copy the code

Third, fissure

As we all know, all operations in Zookeeper are notified by the Leader node. If there are multiple Leaders in a Zookeeper cluster, problems such as abnormal message and data synchronization will inevitably result. It ended up breaking the availability of the entire cluster.

How does Zookeeper solve this problem

Simply put: odd number of nodes + more than half of the election mechanism

Look at the source code

In the process of Leader election, if a zkServer obtained more than half of the votes, the zkServer can become the Leader, on the half mechanism of the source code is very simple:

public class QuorumMaj implements QuorumVerifier {
    private static final Logger LOG = LoggerFactory.getLogger(QuorumMaj.class);
    
    int half;
    
    // n indicates the number of zkServer participants in the cluster, excluding observer nodes
    public QuorumMaj(int n){
        this.half = n/2;
    }

    // Verify compliance with the half rule
    public boolean containsQuorum(Set<Long> set){
        // Half is assigned in the constructor
        Set.size () indicates the number of votes obtained by a zkServer
        return(set.size() > half); }}Copy the code

The core code is the following two lines:

this.half = n/2;
return (set.size() > half);
Copy the code

Why is the cluster required to have an odd number of nodes?

1. Prevent cluster unavailability caused by split brain

(1) If the ZooKeeper cluster has 5 nodes and brain split occurs, the brain split into two small clusters A and B:

A: 1 node, B: 4 nodes (providing services and Leader in B) A: 2 nodes, B: 3 nodes (providing services and Leader in B)Copy the code

It can be seen that in the above two cases, there is always A small cluster in A and B that meets the number of available nodes > the total number of nodes /2. Therefore, the ZooKeeper cluster can still elect a leader and provide external services, but some nodes fail.

(2) Suppose there are four nodes in the ZooKeeper cluster, and brain split also occurs, and brain split into two small clusters A and B:

A: 1 node, B: 3 nodes (can provide services, Leader in B) A: 2 nodes, B: 2 nodes (can not provide services, A and B cannot meet set.size() > half)Copy the code

It can be seen that situation (a) satisfies the election condition, which is the same as the example in (1). However, the case of (B) is different, because both A and B have two nodes, neither of which meets the election condition that the number of available nodes > the total number of nodes /2. Therefore, ZooKeeper cannot provide services at all.

The above two examples show that: When the number of nodes is odd, the ZooKeeper cluster can always provide external services (even if some nodes are lost). If the number of nodes is even, there is a possibility that the ZooKeeper cluster will not work (split into two equal subsets).

2. In the case of the same fault tolerance, odd numbers save more resources.

The number of available nodes is greater than the total number of nodes /2. Note that it is >, not ≥.

Two examples:

(1) If ZooKeeper cluster 1 has three nodes, 3/2=1, that is, if ZooKeeper wants to provide services normally (that is, the leader election succeeds), at least two nodes need to be normal. In other words, a 3-node ZooKeeper cluster allows one node to go down.

(2) If ZooKeeper cluster 2 has four nodes, 4/2=2, that is, if ZooKeeper wants to provide services normally (that is, the leader election succeeds), at least three nodes need to be normal. In other words, a four-node ZooKeeper cluster can also allow one node to go down.

The problem is that cluster 1 and cluster 2 both have fault tolerance that allows one node to go down, but cluster 2 has one more node than cluster 1. With the same fault tolerance capability, it is better to maintain an odd number of nodes in the ZooKeeper cluster to save resources.

In addition to Zookeeper, there are other solutions such as Mysql and Redis. Which one do you usually use? Welcome to “The Tao of Java Programming”! 🌹