preface

In single machine environment, we can use JAVA Synchronized and Lock to realize internal process Lock. However, with the emergence of distributed application and cluster environment, the competition of system resources has changed from single-process multi-thread competition to multi-process competition. At this time, distributed Lock is needed to ensure. There are three main ways to realize distributed lock: 1. Database based index and row lock 2. In this article, we use Redis to realize the temporary ordered node based on Zookeeper. We will analyze it based on various existing lock implementations. Finally, we will share Redission's lock source code analysis to see the open source implementation of distributed lockCopy the code

Designed and implemented

lock

First, through setNx and getSet to achieve

This is now the implementation of most versions of the Internet, the author used distributed lock in the previous project is also achieved in this way

Public Boolean lock(Jedis Jedis, String lockName, Integer EXPIRE) {// Returns whether the configuration succeeded //setLong now = system.currentTimemillis (); boolean result = jedis.setnx(lockName, String.valueOf(now + expire * 1000)) == 1;if(! Result) {// Deadlock-resistant String timestamp = jedis.get(lockName);if(timestamp ! = null && long.parselong (timestamp) < now) {// Delete lock without using del method. Instead, oldValue = jedis.getSet(lockName, String.valueof (now + expire));if(oldValue ! = null && oldValue.equals(timestamp)) { result =true; jedis.expire(lockName, expire); }}}if (result) {
         jedis.expire(lockName, expire);
     }
     return result;
 }

Copy the code

Code analysis:

  1. The setNx command ensures the atomicity of the operation, obtains the lock, and sets the expiration time to value

  2. The expire method is used to set the expiration time. If the expire time fails, the timestamp of the value is compared with the current timestamp to prevent deadlocks

  3. The getSet command is used to avoid deleting locks that could be acquired by other threads in the process if the lock is found to have expired and not released

There is a problem

  1. The solution to deadlock prevention is determined by the current system time, but online server system time is generally consistent and this is not a serious problem
  2. When the lock expires, multiple threads may execute the getSet command. In contention, the timestamp of the value may be modified, which is theoretically incorrect
  3. The lock cannot be identified by the client, and the same key may be deleted by other clients during unlocking
  4. Although there are some minor problems, the distributed lock implementation is generally satisfactory, which can achieve mutual exclusion of locks and avoid deadlocks

Second, through Redis high version of the atomic command

Jedis’s set command can come with complex parameters through which atomic distributed lock commands can be implemented

 jedis.set(lockName, ""."NX"."PX", expireTime);
Copy the code

The code analysis

  1. Redis set command can carry complex parameters, the first is the key lock, the second is the value that can be stored for locks the client ID, through the check whether the current client access to the lock, and the third parameter selection NX/XX, the fourth parameter EX | PX, the fifth is time

  2. NX: set this key if it does not exist XX: set this key if it does exist

  3. EX: in seconds, PX: in milliseconds

  4. This command essentially merges our previous setNx and EXPIRE commands into one atomic operation command, without requiring us to consider set failures or EXPIRE failures


unlock

Redis del command

 	 public boolean unlock(Jedis jedis, String lockName) {
    	 jedis.del(lockName);
     	return true;
 }
Copy the code

The code analysis

Using the del command of redis, you can delete locks directly. Locks that already exist in other threads may be deleted by mistake

Del check of Redis

Public static void unlock2(Jedis Jedis, String lockKey, String requestId) {// Check whether the lock and unlock are the same clientif(requestid.equals (jedis.get(lockKey))) {// If at this point the lock is not the client, the lock jedis.del(lockKey) will be misunderstood; }}Copy the code

The code analysis

RequestId client ID determination has been added, but since the operation is not atomic, it cannot be guaranteed in the case of concurrent contention under multiple processes

Redis LUA script

public static boolean unlock3(Jedis jedis, String lockKey, String requestId) {

      String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
      Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(""));

      if (1L == (long) result) {
          return true;
      }
      return false;

  }
Copy the code

The code analysis

Lua scripts are used to ensure the atomicity of the operation. In fact, the previous judgment and delete command is merged into an atomic script command. The logic is, first check whether the value is equal through get, delete if it is equal, otherwise return directly

Redission distributed lock

Redission is a redis client recommended by Redis official website. In addition to the CURD command based on Redis, Redission provides a convenient distributed lock APICopy the code

First, basic usage

		RedissonClient redissonClient = RedissonTool.getInstance();

        RLock distribute_lock = redissonClient.getLock("distribute_lock");

        try {
            boolean result = distribute_lock.tryLock(3, 10, TimeUnit.SECONDS);
            
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if(distribute_lock.isLocked()) { distribute_lock.unlock(); }}Copy the code

Code process

  1. Get the RLock instance from redissonClient
  2. TryLock Attempts to acquire the lock. The first is the wait time, the second is the lock timeout, and the third is the unit of time
  3. After the service logic is executed, the lock is finally released

Second, concrete implementation

TryLock is used to analyze redission’s distributed implementation. The lock method is similar to tryLock, but without a maximum wait time setting, it spins around waiting for the lock to be released until it is acquired

 		long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); Final Long threadId = thread.currentThread ().getid (); Long TTL = tryAcquire(leaseTime, unit, threadId); // lock acquiredif (ttl == null) {
            return true;
        }
        
        time -= (System.currentTimeMillis() - current);
        if(time <= 0) {// Wait time ends, return acquireFailed(threadId);return false; } current = System.currentTimeMillis(); Final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);if(! await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {if(! subscribeFuture.cancel(false)) {
                subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                    @Override
                    public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                        if(subscribeFuture.isSuccess()) { unsubscribe(subscribeFuture, threadId); }}}); } acquireFailed(threadId);return false;
        }

        try {
            time -= (System.currentTimeMillis() - current);
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
        
            while (true) {
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }

                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }

                // waiting forCurrentTime = System.currentTimemillis ();if (ttl >= 0 && ttl < time) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribeFuture, threadId);
        }
Copy the code

The code analysis

  1. TryAcquire attempts to acquire the lock. If TTL is null, the lock is obtained

  2. Check whether the wait time expires. If the wait time expires, the system displays that obtaining the lock failed

  3. The Channel of Redis is used to subscribe to the queue, subscribe is blocked internally by semaphore, and then blocked by await method. In fact, CountDownLatch is used to block internally to obtain the result of subscribe asynchronous execution, so as to ensure the success of subscription. Then determine whether the waiting time is up

  4. RedissonLockEntry also maintains a Semaphore semaphore while the loop blocks here waiting for the lock to be released

  5. Whether or not the lock is released, the queue message is eventually unsubscribed

  6. The internal getEntryName of redission is the client instance ID+ the lock name to ensure that locks in multiple instances can be reentrant


TryAcquire acquiring a lock

Redisssion is the core code for acquiring locks. Internally, it is an asynchronous call, but it is blocked with the get method

 private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(leaseTime, unit, threadId));
    }
Copy the code
 private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1) {
            returntryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.addListener(new FutureListener<Long>() { @Override public void operationComplete(Future<Long> future)  throws Exception {if(! future.isSuccess()) {return;
                }

                Long ttlRemaining = future.getNow();
                // lock acquired
                if(ttlRemaining == null) { scheduleExpirationRenewal(threadId); }}});return ttlRemainingFuture;
    }
Copy the code
  1. The tryLockInnerAsync method internally obtains the lock based on Lua scripts

    • If KEYS[1] (lock name) do not exist, hset (key value) and pexpire (expiration time) will be set, and null will be returned to indicate that the lock is obtained
    • Hexists determines if the lock is occupied by the current thread if it exists. If so, hincrby increases the number of reentries and resets the expiration time
     <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
            internalLockLeaseTime = unit.toMillis(leaseTime);
    
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command."if (redis.call('exists', KEYS[1]) == 0) then " +
                          "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                      "end; " +
                      "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                      "end; " +
                      "return redis.call('pttl', KEYS[1]);",
                        Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
        }
    Copy the code
  2. Redission’s solution to avoid deadlocks:

    Redission uses a special solution to prevent the lock from being released. If the default expiration time is not set, the default expiration time of Redission is 30s, and the lock is released before the service is completed. When Redisson obtains the lock and the default expiration time, The system starts a scheduled task on the current client and updates the key expiration time every internalLockLeaseTime/3. In this way, the lock is not released in advance. If the client breaks down, the lock will be automatically released within 30 seconds at most (the scheduled task that updates the expiration time also breaks down).

    // Lock acquired, the task to set the periodic update time when the lock is acquiredif(ttlRemaining) { scheduleExpirationRenewal(threadId); } //expirationRenewalMap = expirationRenewalMap = expirationRenewalMap InternalLockLeaseTime/reset after 3 time expiration time private void scheduleExpirationRenewal (final long threadId) {if (expirationRenewalMap.containsKey(getEntryName())) {
                return;
            }
    
            Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
                @Override
                public void run(Timeout timeout) throws Exception {
                    
                    RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                                "return 1; " +
                            "end; " +
                            "return 0;",
                              Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
                    
                    future.addListener(new FutureListener<Boolean>() {
                        @Override
                        public void operationComplete(Future<Boolean> future) throws Exception {
                            expirationRenewalMap.remove(getEntryName());
                            if(! future.isSuccess()) { log.error("Can't update lock " + getName() + " expiration", future.cause());
                                return;
                            }
                            
                            if(future.getNow()) { // reschedule itself scheduleExpirationRenewal(threadId); }}}); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
                task.cancel();
            }
        }
    Copy the code

    Unlock the unlock

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                    "end;" +
                    "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                    "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; "+
                    "end; " +
                    "return nil;",
                    Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
    
        }
    Copy the code

    Redission unlock unlock is based on the Lua script, internal logic is to first determine the existence of the lock and there is no description has been released, release the lock release message back, lock is to judge whether the current thread lock owner, if not, have the right to release the returned, unlock, minus the number of reentrant, to update the expiration time, If the reentrance is complete, delete the current key and release the lock release message

    Write in the back

    Distributed lock is mainly designed and implemented based on Redis. Through common design ideas, Redission is extended to the implementation of Redission, which is excellent in both design ideas and code robustness and worth learning. The next step will explain distributed lock implementation of Zookeeper and relevant open source analysis.