In the first part of the big data series, when building HDFS high availability cluster, talked about a technical point, for Java and big data applications will be very popular, especially in the interview, right, some friends may have guessed, right, is distributed coordination service – ZooKeeper

It is an important component of the Apache Foundation for ZooKeeper, a typical distributed data consistency solution. Distributed applications can implement functions such as data publish/subscribe, load balancing, naming services, distributed coordination/notification, cluster management, Master elections, distributed locks, and distributed queues based on it.

May say that others don’t understand, it doesn’t matter, to an instance of life, every professional technology always can find corresponding instance in life, just as they are, it will be busy first Ann which can well explain the zookeeper, believe that you have experienced such a thing, elected monitor of the class, every time at the start of the new term, Will elect a class’s and grade’s manager, monitor of the class, after class of the corresponding transaction notification by the monitor and management, right, we monitor the election, follow a rule, the minority is subordinate to the majority, zookeeper is played such a role, the gnome male – “, a bit digress, with the title is a little deviation, later I will be in big data series, In detail, the internal running process of ZooKeeper is sorted out. Today, our main concept is the functional mechanism of ZooKeeper mentioned above, distributed lock

As you all know, if we have multiple threads preempting the same resource on a machine, and there is an exception if we execute it multiple times, we call it non-thread-safe. In general, we use locks to solve this problem, and in the Java language, we can use synchronized. If it is a different Java instance on the same machine, we can use the system’s file read/write lock to solve the problem, and then extend to different machines? We usually solve this with distributed locks.

The characteristics of distributed locks are as follows:

  • Mutual exclusion: Like our local locks, mutual exclusion is basic, but distributed locks need to be mutually exclusive for different threads on different nodes.

  • Reentrancy: The same thread on the same node can acquire the lock again if it has acquired the lock.

  • Lock timeout: Supports lock timeout as local locks, preventing deadlocks.

  • High efficiency and high availability: lock and unlock need to be efficient, but also need to ensure high availability to prevent distributed lock failure, can increase degradation.

  • Blocking and non-blocking: Like ReentrantLock, lock and trylock and trylock (long timeOut) are supported.

  • Support for fair and unfair locks (optional) : Fair locks mean locks are acquired in the order in which they are requested, whereas unfair locks are unordered. This is generally under-implemented. Distributed locks. I’m sure you’ve all encountered a business scenario where we have a scheduled task that needs to be executed at a scheduled time, but the task is not idempotent at the same time, so we can only make one machine one thread to execute it

Distributed locks are implemented in many ways, including Redis, ZooKeeper, And Google’s Chubby

Today? Two common redis and ZooKeeper distributed lock implementation principle and we share

Just a quick introduction. I believe you have already thought of a solution here, that is, every time the task is executed, first query redis whether there is a lock key, if not, write, and then start the task.

For example, when process A and process B query Redis, they both find that there is no corresponding value in Redis, and then they both start to write. Since they are not reading or writing to the version, they both succeed in writing and both get the lock. Fortunately, Redis provides an atomic write operation, setnx(SET if Not eXists).

It’s naive to think that this is all you need to do for a distributed lock. Let’s consider the extreme case where a thread gets the lock, but unfortunately the machine freezes, the lock is never released, and the task is never executed. A better solution is to estimate how long it will take for a program to execute, and then set a timeout period for the lock, after which someone else will be able to retrieve the lock. But this leads to another problem. Sometimes the load is so high that the task is executed slowly, and as a result, another task is executed after the timeout period.

The charm of architectural design is that when you solve a problem, there are always new problems that need to be solved step by step. In this way, we can usually open a daemon thread after the lock is preempted, and periodically go to Redis to ask whether I am still preempting the current lock and how long it will expire. If we find that the lock is about to expire, we can renew it as soon as possible.

Ok, now that you have learned how to implement a distributed lock service using Redis

Zookeeper implements distributed locks as follows:

In the figure above, the Zookeeper cluster is shown on the left, lock is the data node, node_1 to node_n represents a series of sequential temporary nodes, and client_1 to client_N on the right represents the client that is acquiring the lock. Service is a mutually exclusive access Service.

The source code below is a distributed lock based on Zookeeper’s open source client Curator. The native API implementation of ZK will be more complicated, so here we directly use the wheel of Curator, and adopt acquire and release methods of Curator to achieve distributed lock.

import org.apache.curator.RetryPolicy;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.recipes.locks.InterProcessMutex;import org.apache.curator.retry.ExponentialBackoffRetry;public class CuratorDistributeLock {    public static void main(String[] args) {        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory. NewClient (" 111.231.83.101:2181, "retryPolicy);        client.start();CuratorFramework client2 = CuratorFrameworkFactory. NewClient (" 111.231.83.101:2181, "retryPolicy);        client2.start();// Create a distributed lock with a root path of/exhibit /lock        InterProcessMutex mutex  = new InterProcessMutex(client,"/curator/lock");        final InterProcessMutex mutex2  = new InterProcessMutex(client2,"/curator/lock");        try {            mutex.acquire();        } catch (Exception e) {            e.printStackTrace();        }// Get the lock and proceed with the business process        System.out.println("clent Enter mutex");         Thread client2Th = new Thread(new Runnable() {             @Override             public void run() {                 try {                     mutex2.acquire();                     System.out.println("client2 Enter mutex");                     mutex2.release();                     System.out.println("client2 release lock");                 }catch (Exception e){                     e.printStackTrace();                 }             }         });        client2Th.start();// Complete the business process and release the lock        try {            Thread.sleep(5000);            mutex.release();            System.out.println("client release lock");            client2Th.join();        } catch (Exception e) {            e.printStackTrace();        }// Close the client        client.close();    }}Copy the code

The result of the above code is as follows:

As you can see, the client first acquires the lock and then executes the business, and then it is client2’s turn to attempt to acquire the lock and execute the business.

Trace the acquire() lock method all the way to the attemptLock core function.

   String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception{.while ( ! isDone )        {            isDone = true;            try            {// Create temporary ordered nodes                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);// Determine whether you are the smallest node, if not add a listener for the previous node to be deleted notification                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);            }        }// Returns the node path if the lock is acquired        if ( hasTheLock )        {            return ourPath;        }.    }    Copy the code

    

InternalLockLoop function

 private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception    {.while ( (client.getState() == CuratorFrameworkState.STARTED) && ! haveTheLock )            {// Get the list of child nodes sorted from smallest to largest                List<String>        children = getSortedChildren();                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash// Determine whether you are the smallest node                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);                if ( predicateResults.getsTheLock() )                {// The lock was successfully obtained                    haveTheLock = true;                }                else                {// Get the previous node                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();// If the lock is not obtained, call wait, wait for the previous node to remove, and wake up the current thread with a notifyAll callback                    synchronized(this)                    {                        try                         {// Set the listener, getData will determine whether the previous node exists, if not, will throw an exception so that the listener is not set                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);// If millisToWait is set, wait for a period of time to delete itself out of the loopif ( millisToWait ! = null )                            {                                millisToWait -= (System.currentTimeMillis() - startMillis);                                startMillis = System.currentTimeMillis();                                if ( millisToWait <= 0 )                                {                                    doDelete = true;    // timed out - delete our node                                    break;                                }// Wait a while                                wait(millisToWait);                            }                            else                            {// Wait and wait                                wait();                            }                        }                        catch ( KeeperException.NoNodeException e )                         {//getData throws an exception when it finds that the previous child node is deleted                        }                    }                }            }        }.    }Copy the code

Using ZK to achieve distributed lock is not very common in practical applications, requires a SET of ZK cluster, and frequent monitoring is also a pressure on the ZK cluster, so it is not recommended for everyone. But can go to the interview, can specifically talk about the use of ZK to achieve distributed lock, I think should also be a plus.

For Those of you who want to practice ZooKeeper, here are the installation steps

Build a ZooKeeper cluster (preferably home, not prefix, especially hadoop)

Upload zookeepr package

Decompress the package tar -xf zookeeper-3.4.6.tar.gz

Move the ZooKeeper package to the /opt/ SXT directory: mv Zookeeper-3.4.6 /opt/ SXT

Run the vi /etc/profile command to configure the zooKeeper environment variable

 

Configure the ZooKeeper configuration file

Go to the conf directory in the ZooKeeper home directory, and a zoo_sample. CFG file is displayed

CFG zoo.cfg: cp zoo_sample.cfg zoo.cfg

CFG: vi zoo. CFG

  

Switch to the /var/sxt/zk data directory and run the echo 1 > myID echo 2 > myID echo 3 > myID command to perform operations on node02 node03 node04

(

 ;

)

That is, create a myid file and add 1,2, and 3 respectively to server 1 and server 2 to represent their zookeeper ids, which correspond to the zookeeper configuration file above.

Distribution from node02 to node03/node04: SCP -r zookeeper-3.4.6/ node04: ‘PWD’ (Start to the current directory, that is, the destination directory is the same as the source directory; You can also start with /

Zookeeper-3.4.6 /); otherwise, the directory name will not be distributed.

Start the ZooKeeper cluster:

Sh start node02, node03, and node04

Check the startup status of each ZooKeeper cluster node.

zkServer.sh  status

zkServer.sh stop

 

Well, here, about the distributed lock and the construction of ZooKeeper related knowledge, more about ZooKeeper and Java big data related knowledge, later will continue to update in the public number, welcome everyone to pay attention to the public number: Java architect union, don’t get lost