preface

This article introduces the implementation of distributed lock by ZooKeeperCopy the code

Introduction of the principle

Zookeeper implementation principle of distributed lock is multiple nodes at the same time in a specified node node, create a temporary session below order who create the node number of the youngest, who will get the lock, and the other nodes will monitor serial number smaller than their nodes, once the serial number is smaller than their node is deleted, the other nodes will get the corresponding event, Then check to see if you are the node with the smallest serial number, and if so, get the lockCopy the code

Docker installed zk

Download mirror

docker pull zookeeper
Copy the code

Boot image


docker run --name zk -p  2181:2181 -p 2888:2888 -p 3888:3888 --restart always -d zookeeper
Copy the code

-p port mapping --name Container instance name -d Background running 2181 Interactive port of the Zookeeper client 2888 Zookeeper cluster port 3888 Zookeeper election port

Check the container

docker ps |grep zookeeper
Copy the code

Zk simple several operation commands

Enter the Docker container

docker exec -it 942142604a46  bash
Copy the code

Viewing Node Status

./bin/zkServer.sh status
Copy the code

Start the Client

./bin/zkCli.sh
Copy the code

Creating temporary Nodes

The create - e/node1 node1.1Copy the code

Create a temporary node that will be deleted when the client shuts down. Creates a permanent node without -e

Get node value

 get /node
Copy the code

List node values

ls /node
Copy the code

Deleting a Node Value

delete /node
Copy the code

Viewing Node Information

stat /test
Copy the code

Let’s first introduce Client framework Curator for ZK

Introduction to the

Curator is an open source ZooKeeper client framework from Netflix, which addresses many of the very low-level details of zooKeeper client development, including connection reconnection, repeated registry of Watcher and NodeExistsException exceptionsCopy the code

Maven dependency for Curator


Introduce the basic API for Curator

  • Create sessions using static engineering methods
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory. NewClient (127.0.0.1: "2181", 5000, 5000, retryPolicy);Copy the code
RetryPolicy indicates the RetryPolicy

The first parameter is the initial sleep time of baseSleepTimeMs, which is used to calculate the sleep time of each subsequent retry.

Copy the code

The second parameter is maxRetries, the maximum number of retries

  • Created using the Fluent Style API
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory. The builder (). The connectString (127.0.0.1: "2181"). SessionTimeoutMs (5000) / / ConnectionTimeoutMs (5000) // connectionTimeoutMs.retrypolicy (retryPolicy).namespace("base") // contains the isolation name.build (); client.start();Copy the code
  • Creating a Data Node
Lient. The create (). CreatingParentContainersIfNeeded () / / recursion to create the required parent withMode (CreateMode. PERSISTENT) / / create a type of PERSISTENT node .forPath("/nodeA", "init".getBytes()); // Table of contentsCopy the code
  • Deleting a Data Node
Client. The delete (). Guaranteed () / / force that deleted deletingChildrenIfNeeded () / / recursive delete child nodes withVersion (10086) / / delete version number specified .forPath("/nodeA");Copy the code
  • Read data node
byte[] bytes = client.getData().forPath("/nodeA"); 
        System.out.println(new String(bytes));
Copy the code
  • Read the stat
Stat stat = new Stat();
        client.getData()
                .storingStatIn(stat)
                .forPath("/nodeA");
Copy the code
  • Modifying a Data Node
Client.setdata ().withVersion(10086) // Specify version modify.forpath ("/nodeA", "data".getbytes ());Copy the code
  • The transaction
client.inTransaction().check().forPath("/nodeA")
                .and()
                .create().withMode(CreateMode.EPHEMERAL).forPath("/nodeB", "init".getBytes())
                .and()
                .create().withMode(CreateMode.EPHEMERAL).forPath("/nodeC", "init".getBytes())
                .and()
                .commit();
Copy the code
  • other
Client.checkexists () // Checks whether.forpath ("/nodeA") exists; client.getChildren().forPath("/nodeA"); // Get the path of the child nodeCopy the code
  • An asynchronous callback
Executor executor = Executors.newFixedThreadPool(2);
        client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .inBackground((curatorFramework, curatorEvent) -> {
                    System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
                },executor)
                .forPath("path");
Copy the code

Zk distributed implementation code analysis

First, the test method describes the complete process of acquiring a ZK lock


How do you do that by accessing the interface

The directory structure


Initialize the ZK client connection


Zk client application, release lock implementation


  • InitializingBean, DisposableBean interface is implemented

The ZooKeeper client establishes a session with the ZooKeeper server when the application is started (when the client.start method is executed). The session is closed when the system is shut downCopy the code

Define an abstract business processing interface


A single thread acquires the ZK lock


Multiple threads acquire the ZK lock


Creating a thread pool

ExecutorService executorService = Executors.newFixedThreadPool(20);

Copy the code

20 threads simultaneously initiate the same ZK lock request

Source code analysis for Curator

Establishing and closing a session

After the client.start call, a session link is created between the ZooKeeper server and the session is disconnected when the system shuts downCopy the code
  • Client. Start source code analysis

  • Startup log


  • Close the log

  • Zk logs during system startup

  • Zk logs when the system is shut down

  • Access multi-threaded access to the ZK lock interface
The curl http://127.0.0.1:8080/batch-acquire-lock

Check zK lock status

Copy the code


See the logCopy the code

20 temporary nodes will be created under /lock-path from 0 to 19. Only the thread that created the temporary node with serial number 0 will successfully acquire the lock. Other temporary nodes that did not acquire the lock will be deleted The temporary node corresponding to the lock will still existCopy the code

There is also a drawback

Temporary session order nodes are removed, but their parent /lock-path is not. Therefore, many empty nodes are left when the ZooKeeper distributed lock is used in high-concurrency service scenarios

Copy the code

Node to create

Tracking the lock. Acquire (200, TimeUnit. MILLISECONDS) into the org. Apache. Curator. Framework. The recipes. The locks. StandardLockInternalsDriver# createsTheLockCopy the code

The node created is EPHEMERAL_SEQUENTIAL, which means that it is deleted when the client connection is disconnected, and, We call the org. Apache. Curator. Framework. Recipes. The locks. InterProcessMutex# release will delete the nodeCopy the code

reentrancy

Track of acquiring a lock code into the org. Apache. The curator. Framework. The recipes. The locks. InterProcessMutex# internalLockCopy the code

It can be seen that the lock of ZooKeeper is reentrant, that is, the same thread can acquire the lock multiple times, only the first time to actually create a temporary session sequence node, the subsequent lock acquisition times are increased by 1. Accordingly, each time the lock is released, the reentrant count of the lock is reduced by 1, and only the last time the node is actually deletedCopy the code

Client fault detection:

In normal cases, the client sends a PING request to the server for heartbeat check within the session validity period. After receiving the request from the client, the server activates the corresponding session of the client, which extends the lifetime of the session. If some sessions are not activated, it indicates that the client is faulty. The session timeout detection task on the server checks the sessions that are not activated and cleans up the session. In the clearing process, temporary session nodes (including temporary session sequence nodes) are deleted. This ensures the fault tolerance of zooKeeper distributed locks. As a result, the lock is not released due to an unexpected client exit, and other clients cannot obtain the lock.Copy the code

Data consistency:

A ZooKeeper server cluster generally consists of a leader node and other follower nodes. Data is read and written on the Leader node. When a write request is received, the leader node will initiate a proposal. After most of the followers return an ACK, the leader will initiate a commit. Only after most of the followers commit the proposal, the leader will return a success request to the client. If the leader fails later, the leader election algorithm of the ZooKeeper cluster uses THE ZAB protocol to ensure that the follower node with the latest data is elected as the new leader. Therefore, all data submitted on the original leader node will be stored on the new leader node. This ensures the consistency of the data requested by the client.Copy the code

CAP:

No distributed architecture can satisfy C (consistency), A (availability), and P (partition tolerance) at the same time. Therefore, the ZooKeeper cluster makes A choice between A and P while ensuring consistency, and finally chooses P, resulting in poor availability.Copy the code

From what has been discussed above

Zookeeper distributed locks ensure fault tolerance and consistency. However, idle nodes (/lock-path) are generated and are sometimes unavailable.Copy the code

The source code

https://gitee.com/pingfanrenbiji/distributed-lock/tree/master/zookeeper
Copy the code

Refer to the article

https://my.oschina.net/yangjianzhou/blog/1930493
https://www.jianshu.com/p/db65b64f38aa
Copy the code

This article is formatted using MDNICE