preface

At the end of the article, we left a hole in the distributed lock, which ensures the one-time scheduling of multi-node applications and solves the problem of data consistency in a distributed environment

For example, we now have such a cluster, the cluster inside a cache service, every program in the cluster will use this cache, if the cache is a cache expiration, in large concurrent environment, many services are in the same time to come over to access the cache, access to the data in the cache, the cache expiration, is about to go to the database, Then update to the cache service. But in fact, we only need a request to the database to update the cache, and then in this scenario, what do we do

We refer to the locking method used in the multi-threaded scenario, but in the current concurrent scenario, we also need to implement a lock.


Use Zookeeper for development

1. Lock features with native ZooKeeper

What are the characteristics of common locks?

Database: primary key unique constraint for update cache: redis setnx command zookeeper: similar to file system blocking: Other threads that do not grab the lock block until the lock is released. This behavior reentrancy: after a thread acquires a lock, it can acquire the lock againCopy the code

Why can zooKeeper be used to implement locks

You can't have the same child nodes in the same parent directory, so that's the exclusivity of ZooKeeper and the blocking reentrant through the JDK barrier is something that we can do with countersCopy the code

③ What is the problem with native ZooKeeper

The interface is difficult to use. 2. The connection to ZooKeeper times out and does not support automatic reconnection. There is no support for recursively creating nodes (for example, if I want to create a file, I can create it with the package if I create it in idea, but not in window) 5. Issues that require manual serialization SettingsCopy the code

④ Create the core class Zookeeper for the client

Org. Apache. Zookeeper org. Apache. Zookeeper. Data connect - connected to the zookeeper collection the create - create a znode exist - check whether znode existence and its information GetData -- gets data from a specific znode setData-- sets data from a specific znode getChildren-- gets all the children of a specific znode delete=== Deletes a specific Znode and all its children close-- closes the connectionCopy the code

2. Use the third-party zkClient to simplify operations

① Implement the ZkSerializer interface

MyZkSerializer.java

Public class MyZkSerializer implements ZkSerializer {public class MyZkSerializer implements ZkSerializer { @Override public byte[] serialize(Object data) throws ZkMarshallingError {String d = (String) data; try { return d.getBytes("UTF-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return null; } @Override public Object deserialize(byte[] bytes) throws ZkMarshallingError { try { return new String(bytes, "UTF-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return null; }}Copy the code

② Simple use of ZkClient

ZkClientDemo.java

Public class ZkClientDemo {public static void main(String[] args) {// Create a ZK client ZkClient("localhost:2181"); Client.setzkserializer (new MyZkSerializer())); // Implement the serialization interface. // Create a node zk under zk, create a child node app6 under zk, and assign the value 123 // Createmode.persistent -- ZNode does not automatically delete client.create("/zk/app6", "123", createmode.persistent) when a client is disconnected; client.subscribeChildChanges("/zk/app6", new IZkChildListener() { @Override public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {system.out. println(parentPath+" Child node changes: "+currentChilds); }}); SubscribeDataChanges (); subscribeDataChanges(); // Originally, the watch was set and then fetched only once. Now we use the English word "subscribe", which means "subscribe", which means that the watch is always there. // Using this method, we can easily achieve the effect of continuous monitoring. Convenient than native zookeeper client. SubscribeDataChanges ("/zk/app6 ", new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception { System.out.println(dataPath+" node deleted "); } @Override public void handleDataChange(String dataPath, Object data) throws Exception {system.out.println (dataPath+" changes: "+data); }}); try { Thread.currentThread().join(); } catch (InterruptedException e) { e.printStackTrace(); }}}Copy the code

The results

Call ls /zk– you can see app6 has been created,

Get /zk/app6– to get the value of 123

It shows that there is no problem with our program and it can be executed successfully

Here we test for listening events

Create /zk/app6/tellYourDream — console print /zk/app6 child node change: [tellYourDream]

Delete /zk/app6/tellYourDream– The console prints that the child node of /zk/app6 has changed: [], and no node exists at this time, so it is empty

Set /zk/app6 123456– /zk/app6 changed to 123456

Delete /zk/app6– Two listening events are triggered at the same time, the /zk/app6 child node changes: null and /zk/app6 nodes are deleted

③ The supplement of CreateMode

1. Persistent node: A node will always exist if you do not delete it. Child nodes can be created

/** * The znode will not be automatically deleted upon a client's disconnect. * PERSISTENT disorder */ PERSISTENT (0, false, false), /** * The znode will not be automatically deleted upon client's disconnect, * and its name will be appended to express support for someone or something. * persistent order */ PERSISTENT_SEQUENTIAL (2, false, true),Copy the code

2. Non-persistent nodes are temporary nodes. Temporary nodes are created when the client is connected and automatically deleted when the client is suspended. Cannot create child nodes

/** * The znode will be deleted upon The client's disconnect. * EPHEMERAL */ EPHEMERAL (1, true, false), /** * The znode will be deleted upon the client's disconnect, And its name * will be appended to express formal support for someone or something. * temporary order */ EPHEMERAL_SEQUENTIAL (3, true, true);Copy the code

There are a few more listening methods that we can try ourselves.

3.Zookeeper implements distributed locks

① ZooKeeper implements distributed locking mode 1

As mentioned earlier, the names of nodes under the same child node in ZooKeeper cannot be the same. We can use this mutual exclusion to implement a distributed locking tool

Temporary nodes exist when they are created and are automatically deleted when they disappear. When the client is lost, the network is unstable or crashes, the locks created by temporary nodes are automatically removed. This is a perfect way to avoid deadlock problems. So we took advantage of this feature to meet our requirements.

In fact, the principle is that nodes cannot be named +watch mechanism.

For example, if our application has multiple service instances, each service instance will create a lock node. The one who creates the lock node will acquire the lock. The other application that we have not created will listen to the lock node. The other is when the client releases the lock and removes the lock node.

ZkDistributeLock. Java (Note that the methods that do not need to be overwritten have been removed)
Public class ZkDistributeLock implements Lock {private String distributelock implements Lock; public class ZkDistributeLock implements Lock {private String distributelock implements Lock; // We need a private ZkClient; // Just our client and lock directory, these two parameters are passed in? / / it needs us to the constructor for the value of public ZkDistributeLock (String lockPath) {if (lockPath = = null | | lockPath. The trim () equals (" ")) {throw New IllegalArgumentException(" Patch cannot be an empty string "); } this.lockPath = lockPath; client = new ZkClient("localhost:2181"); client.setZkSerializer(new MyZkSerializer()); }Copy the code

Implement the methods that the Lock interface overwrites (including trying to create a temporary node tryLock(), unlock(), Lock Lock(), waitForLock() to implement blocking and waking function methods)

@override public Boolean trylock () {try {client.createEphemeral(lockPath); } catch (ZkNodeExistsException e) { return false; } return true; } @Override public void unlock() { client.delete(lockPath); } @override public void lock() {// Block if (! TryLock ()) {// failed to acquire the lock, blocking itself waitForLock(); // Wake up from the wait and try again to get the lock(); } } private void waitForLock() { final CountDownLatch cdl = new CountDownLatch(1); IZkDataListener listener = new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception {System. Out. Println (" -- received node was removed -- -- -- -- -- -- -- -- -- -- -- -- -- "); // wake up the blocked thread cdl.countdown (); } @Override public void handleDataChange(String dataPath, Object data) throws Exception { } }; client.subscribeDataChanges(lockPath, listener); If (this.client.exists(lockPath)) {try {cdl.await(); } catch (InterruptedException e) { e.printStackTrace(); }} / / cancel registered client. UnsubscribeDataChanges (lockPath, listener); }}Copy the code

ZkDistributeLock Now let’s summarize the process again

1. After acquiring a lock, create node success get to carry on business - - and then releases the lock | | | 2. Failed to obtain, register the watch of the node - block wait - cancel the watch- return to the judgment of obtaining the lock, creating the nodeCopy the code

There is a downside to this design. For example, I now have an infinite number of instances, and every time our lock is created, someone acquies the lock, other people will be notified to block, and we will waste a lot of network resources, known as the stampede effect.

At this point we must optimize

Zookeeper implements distributed lock mode 2

Our Lock is a Znode and can also create its child nodes. We use Lock to create temporary sequential nodes, which we have mentioned in the basic concept of high concurrency from 0 (1). Temporary sequential nodes are automatically sorted from small to large. In this case, we assign instances to these sequential children, and the least numbered one acquires the lock. This is very similar to our fair lock concept, which also follows the FIFO principle

Principle: take the sign + minimum sign take lock + watch

Also based on the Lock interface implementation

ZkDistributeImproveLock. Java (note, do not need to rewrite the method has been deleted)
Public class ZkDistributeImproveLock implements Lock {/* * Implements a distributed Lock with a temporary sequential node: Take the queue number (create your own temporary order node), then determine if it is the minimum number, if so, the lock is obtained; If not, register the watcher of the previous node and block until the lock is released: delete the temporary sequential nodes created by yourself */ // same lock directory private String lockPath; Private ZkClient client; private ThreadLocal<String> currentPath = new ThreadLocal<String>(); private ThreadLocal<String> beforePath = new ThreadLocal<String>(); Private ThreadLocal<Integer> reenterCount = threadLocal. withInitial(()->0); public ZkDistributeImproveLock(String lockPath) { if(lockPath == null || lockPath.trim().equals("")) { throw new IllegalArgumentException(" Patch cannot be an empty string "); } this.lockPath = lockPath; client = new ZkClient("localhost:2181"); client.setZkSerializer(new MyZkSerializer()); if (! this.client.exists(lockPath)) { try { this.client.createPersistent(lockPath, true); } catch (ZkNodeExistsException e) { } } } @Override public boolean tryLock() { System.out.println(thread.currentThread ().getName() + "----- try to get distributed lock "); if (this.currentPath.get() == null || ! Client.exists (this.currentpath.get ())) {client.exists(this.currentpath.get ()) {client.exists(this.currentpath.get ())) {client.exists(this.currentpath.get ())) {client.exists(this.currentpath.get ()); Then queued waiting for your turn scene String node = this. Client. CreateEphemeralSequential (lockPath + "/", "locked"); Currentpath.set (node); reenterCount.set(0); List<String> children = this.client.getChildren(lockPath); Collections.sort(children); collections.sort (children); // Determine if the current node is the smallest, If (currentPath.get().equals(lockPath + "/" + children.get(0))) reenterCount.set(reenterCount.get() + 1); System.out.println(thread.currentThread ().getName() + "----- get distributed lock "); return true; Int curIndex = children.indexof (currentPath.get().substring(lockPath.length() + 1)); String node = lockPath + "/" + children.get(curIndex - 1); beforePath.set(node); } return false; } @Override public void lock() { if (! TryLock ()) {// block waiting for waitForLock(); // Try locking the lock() again; } } private void waitForLock() { final CountDownLatch cdl = new CountDownLatch(1); Watcher IZkDataListener listener = new IZkDataListener() {@override public void handleDataDeleted(String dataPath) Throws Exception {system.out.println (thread.currentThread ().getName() + "----- listen to node deletion, distributed lock is released "); cdl.countDown(); } @Override public void handleDataChange(String dataPath, Object data) throws Exception { } }; client.subscribeDataChanges(this.beforePath.get(), listener); If (this.client.exists(this.beforePath.get())) {try {system.out.println (thread.currentThread ().getName() + "----- distributed lock failed to grab, enter the blocked state "); cdl.await(); System.out.println(thread.currentThread ().getName() + "----- release distributed lock, wake up "); } catch (InterruptedException e) { e.printStackTrace(); }} / / woke up, cancel the watcher. Client unsubscribeDataChanges (this) beforePath) get (), the listener); } @override public void unlock() {system.out.println (thread.currentThread ().getName() + "----- release distributed lock "); If (reentercount.get () > 1) {reentercount.set (reentercount.get () - 1); return; } // Delete the node if(this.currentPath.get()! = null) { this.client.delete(this.currentPath.get()); this.currentPath.set(null); this.reenterCount.set(0); }}Copy the code

Ps: Don’t worry about running out of memory, the JVM will garbage collect

4. A simpler third party client — Exhibit

Here for the exhibit is not expanded, if you are interested, you can play

Address: curator.apache.org/curator-exa…

For leader election, locking, adding, deleting, modifying and checking of the framework have been implemented

finally

It seems to have been a long time since the update of the last article, but also because I was busy last week, I still will do my best after that.

Next: High concurrency from Scratch (iii) – Leader election for a Zookeeper cluster