takeaway

  • Please, stop asking me how Zookeeper implements distributed locking!!
  • Is there someone (
    The lockWhere) there are rivers and lakes (
    The transaction), today is not about rivers and lakes, to flirt.
  • The concept of distributed lock, why to use distributed lock, presumably you are already very clear. Redis implements distributed locks. Zookeeper implements distributed locks. Zookeeper implements distributed locks.
  • Chen today respectively from the following aspects to detail how ZK to achieve distributed lock:

    1. “Four Nodes of ZK”
    2. “Implementation of Exclusive Lock”
    3. “Read-write lock implementation”
    4. “Phased locking implemented by Curator”

The four nodes of ZK

  • Persistent node: A node persists after it is created
  • Temporary node: The life cycle of the temporary node is bound to the current session. Once the current session is disconnected, the temporary node will also be deleted. You can delete the temporary node voluntarily.
  • Persistent ordered node: The node creation always exists, and ZK automatically appends the node with an increment suffix as the new node name.
  • Temporary ordered nodes: Retain the features of temporary nodes, and ZK will automatically append a suffix to the node as the new node name.

The implementation of exclusive lock

  • The implementation of the exclusive lock is relatively simple
    “Zk creation node cannot be the same name feature”. The diagram below:
  • According to the above analysis, it can be roughly divided into the following steps:

    1. Attempt to obtain lock: create
      Temporary node, zK guarantees that only one client is created successfully.
    2. The temporary node is successfully created, the lock is successfully obtained, the service logic is executed, and the lock is deleted after the service is executed.
    3. Failed to create temporary node, blocking wait.
    4. Listen for the deletion event. Once the temporary node is removed, the mutex operation is complete and you can try again to acquire the lock.
    5. Recursive: The process of acquiring a lock is a recursive operation,
      Get lock -> Listen -> Get lock.
  • “How to Avoid deadlocks”: A temporary node is created. The temporary node is deleted when the service is down and the service is shut down, and the lock is automatically released.

Code implementation

  • The author refers to the implementation of JDK lock plus template method mode encapsulation, encapsulation interface is as follows:
/ * * *@DescriptionZK distributed lock interface *@AuthorChen *@Date2020/4/7 22:52 * /
public interface ZKLock {
    /** * get lock */
    void lock(a) throws Exception;

    /** * unlock */
    void unlock(a) throws Exception;
}
Copy the code
  • The template abstract classes are as follows:
/ * * *@DescriptionExclusive lock, template class *@AuthorChen *@Date2020/4/7 22:55 * /
public abstract class AbstractZKLockMutex implements ZKLock {

    /** * Node path */
    protected String lockPath;

    /** * zk client */
    protected CuratorFramework zkClient;

    private AbstractZKLockMutex(a){}

    public AbstractZKLockMutex(String lockPath,CuratorFramework client){
        this.lockPath=lockPath;
        this.zkClient=client;
    }

    /** * template method, build the framework to obtain the lock, the specific logic is handed over to the subclass implementation *@throws Exception
     */
    @Override
    public final void lock(a) throws Exception {
        // Succeeded in obtaining the lock
        if (tryLock()){
            System.out.println(Thread.currentThread().getName()+"Lock obtained successfully");
        }else{  // Failed to obtain the lock
            // block is waiting
            waitLock();
            // Get the lock againlock(); }}/** * try to get the lock, subclass */
    protected abstract boolean tryLock(a) ;


    /** ** wait to acquire lock, subclass implementation */
    protected abstract void waitLock(a) throws Exception;


    /** * Unlock: Delete the node or disconnect the node */
    @Override
    public  abstract void unlock(a) throws Exception;
}
Copy the code
  • The specific implementation classes for exclusive locking are as follows:
/ * * *@DescriptionAn implementation class for exclusive locking that inherits the template class AbstractZKLockMutex *@AuthorChen *@Date 2020/4/7 23:23
 */
@Data
public class ZKLockMutex extends AbstractZKLockMutex {

    /** * to implement thread blocking */
    private CountDownLatch countDownLatch;

    public ZKLockMutex(String lockPath,CuratorFramework zkClient){
        super(lockPath,zkClient);
    }

    /** * attempt to acquire the lock: create a temporary node directly. If the node fails to create an exception, it is mutually exclusive@throws Exception
     */
    @Override
    protected boolean tryLock(a)  {
        try {
            zkClient.create()
                    // Temporary node
                    .withMode(CreateMode.EPHEMERAL)
                    // Permission list World :anyone:crdwa
                    .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                    .forPath(lockPath,"lock".getBytes());
            return true;
        }catch (Exception ex){
            return false; }}/** * wait for lock, block listener *@returnReturns true on success, false */ on failure
    @Override
    protected void waitLock(a) throws Exception {
        // Listen for node additions, updates, and deletions
        final NodeCache nodeCache = new NodeCache(zkClient, lockPath);
        // Start the listener
        nodeCache.start();
        ListenerContainer<NodeCacheListener> listenable = nodeCache.getListenable();

        / / listeners
        NodeCacheListener listener=()-> {
            // The lock is acquired when the node is deleted
            if (nodeCache.getCurrentData() == null) {
                //countDownLatch is not null, indicating that the node exists and that the node is deleted
                if(countDownLatch ! =null) countDownLatch.countDown(); }};// Add a listener
        listenable.addListener(listener);

        // Check whether the node exists
        Stat stat = zkClient.checkExists().forPath(lockPath);
        // The node exists
        if(stat! =null){
            countDownLatch=new CountDownLatch(1);
            // block the main thread and listen
            countDownLatch.await();
        }
        // Remove the listener
        listenable.removeListener(listener);
    }

    /** * Unlock and delete node *@throws Exception
     */
    @Override
    public void unlock(a) throws Exception { zkClient.delete().forPath(lockPath); }}Copy the code

How are reentrant exclusive locks designed

  • The reentrant logic is simple, save one locally
    ConcurrentMap.
    keyIs the current thread,
    valueIs defined data, structured as follows:
 private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
Copy the code
  • The pseudocode for reentrant is as follows:
public boolean tryLock(a){
    // Determine whether the current thread has been saved in threadData
    If yes, return true
    // There is no logic to perform lock acquisition
    // Success is saved in threadData
}
Copy the code

Implementation of read/write locks

  • Read/write locks are classified into read locks and write locks. The differences are as follows:

    • A read lock allows multiple threads to read data simultaneously, but does not allow the writer thread to modify the data while reading.
    • A write lock does not allow multiple threads to write or read at the same time.
  • How to implement read/write lock? There is a class of nodes in ZK called temporary ordered nodes, which was introduced above. Let’s use temporary ordered nodes to implement read and write lock function.

Read lock design

  • The read lock allows multiple threads to read at the same time and does not allow the thread to write at the same time. The implementation principle is as follows:
  • According to the figure above, acquiring a read lock consists of the following steps:

    1. Creates a temporary ordered node (owned by the current thread
      Read lockOr as
      Read the node).
    2. Get all the child nodes under the path and proceed
      Since the childhoodThe sorting
    3. Gets the neighboring write node (write lock) before the current node.
    4. If no adjacent write node exists, the read lock is successfully acquired.
    5. If there are adjacent write nodes, listen for delete events on them.
    6. Once the deletion event is listened for,
      “Repeat steps 2,3,4,5 (recursion)”.

Write lock design

  • Once a thread has acquired a write lock, no other thread is allowed to read or write. The implementation principle is as follows:
  • As can be seen from the figure above, the only difference between the write lock and the listening node is the listening node. Here, it is listening to the neighboring node (read node or write node). The read lock only needs to listen to the write node, and the steps are as follows:

    1. Creates a temporary ordered node (owned by the current thread
      Write lockOr as
      Write a node).
    2. Gets all child nodes under the path and proceeds
      Since the childhoodSorting.
    3. Gets the neighboring nodes (read and write nodes) of the current node.
    4. If no neighboring node exists, the lock is successfully acquired.
    5. If there is an adjacent node, it listens for deletion events.
    6. Once the deletion event is listened for,
      “Repeat steps 2,3,4,5 (recursion)”.

How to listen

  • Both write lock and read lock need to listen on the previous node, the difference is that read lock only listens on the neighboring write node, write lock is listening on all neighboring nodes, abstracted is actually a kind of chain-type listen, as shown in the following figure:
  • Each node listens for neighboring nodes in front of it, and once the previous node is deleted, it listens for nodes in front of it after reordering, and so on recursively.

Code implementation

  • The author writes a simple read-write lock implementation, which is not recommended for use in production environments. The code is as follows:
public class ZKLockRW  {

    /** * Node path */
    protected String lockPath;

    /** * zk client */
    protected CuratorFramework zkClient;

    /** * used to block threads */
    private CountDownLatch countDownLatch=new CountDownLatch(1);


    private final static String WRITE_NAME="_W_LOCK";

    private final static String READ_NAME="_R_LOCK";


    public ZKLockRW(String lockPath, CuratorFramework client) {
        this.lockPath=lockPath;
        this.zkClient=client;
    }

    /** * get lock, block if get failed *@throws Exception
     */
    public void lock(a) throws Exception {
        // Create a node
        String node = createNode();
        // Block waiting to acquire the lock
        tryLock(node);
        countDownLatch.await();
    }

    /** * Create a temporary ordered node *@return
     * @throws Exception
     */
    private String createNode(a) throws Exception {
        // Create temporary ordered nodes
       return zkClient.create()
                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                .forPath(lockPath);
    }

    /** * get write lock *@return* /
    public  ZKLockRW writeLock(a){
        return new ZKLockRW(lockPath+WRITE_NAME,zkClient);
    }

    /** * get read lock *@return* /
    public  ZKLockRW readLock(a){
        return new ZKLockRW(lockPath+READ_NAME,zkClient);
    }

    private void tryLock(String nodePath) throws Exception {
        // Get all the child nodes
        List<String> childPaths = zkClient.getChildren()
                .forPath("/")
                .stream().sorted().map(o->"/"+o).collect(Collectors.toList());


        // The first node is the current lock. The condition that the recursion ends
        if (nodePath.equals(childPaths.get(0))){
            countDownLatch.countDown();
            return;
        }

        //1. Read lock: listen on the first write lock
        if (nodePath.contains(READ_NAME)){
            // Find adjacent write locks
            String preNode = getNearWriteNode(childPaths, childPaths.indexOf(nodePath));
            if (preNode==null){
                countDownLatch.countDown();
                return;
            }
            NodeCache nodeCache=new NodeCache(zkClient,preNode);
            nodeCache.start();
            ListenerContainer<NodeCacheListener> listenable = nodeCache.getListenable();
            listenable.addListener(() -> {
                // Node deletion event
                if (nodeCache.getCurrentData()==null) {// Continue listening on the previous node
                    String nearWriteNode = getNearWriteNode(childPaths, childPaths.indexOf(preNode));
                    if (nearWriteNode==null){
                        countDownLatch.countDown();
                        return; } tryLock(nearWriteNode); }}); }// If a write lock is used, no lock can be read in front of it
        if (nodePath.contains(WRITE_NAME)){
            String preNode = childPaths.get(childPaths.indexOf(nodePath) - 1);
            NodeCache nodeCache=new NodeCache(zkClient,preNode);
            nodeCache.start();
            ListenerContainer<NodeCacheListener> listenable = nodeCache.getListenable();
            listenable.addListener(() -> {
                // Node deletion event
                if (nodeCache.getCurrentData()==null) {// Continue listening on the previous node
                    tryLock(childPaths.get(childPaths.indexOf(preNode) - 1<0?0:childPaths.indexOf(preNode) - 1)); }}); }}/** * find the adjacent write node *@paramChildPath All child nodes *@paramIndex Right border *@return* /
    private String  getNearWriteNode(List<String> childPath,Integer index){
        for (int i = 0; i < index; i++) {
            String node = childPath.get(i);
            if (node.contains(WRITE_NAME))
                return node;

        }
        return null; }}Copy the code

Curator implements step-by-step locking

  • Curator is an open source Zookeeper client of Netflix. Compared with the native client provided by Zookeeper, Curator has a higher level of abstraction, which simplifies the development of Zookeeper client.
  • Curator has encapsulated distributed lock for us, and the general implementation is in accordance with the author’s ideas. Small and medium-sized Internet companies are still recommended to use the framework directly encapsulated, after all, stable, some large Internet companies are handwritten, awesome ah.
  • Creating an exclusive lock is simple as follows:
//arg1: CuratorFramework connection object, arg2: node path lock=new InterProcessMutex(client,path); // Acquire lock. Acquire (); // Release the lock lock.release();Copy the code
  • See the official documentation for more apis, not the focus of this article.

  • “So far ZK distributed lock is introduced, if there are friends who want to source code, old rules, pay attention to wechat public number [code ape technology column], reply keywordsA distributed lockAccess.”

A little benefit

  • For those who are not familiar with Zookeeper, Chen specially spent two days to summarize common knowledge points of ZK, including common shell command of ZK, permission control of ZK and basic operation API of Curator. The directory is as follows:
  • “For those of you who need a PDF, as usual,
    Pay attention to wechat public number [Code ape Technology Column]
    Reply keywordsZK summary.”