preface

In the last article, I wrote the principle and defects of Redis distributed lock. I feel that it is not satisfactory. I just briefly introduced the framework of Redisson, and the specific principle of what has not been said. Before the project busy almost, anyway idle is idle, it is better to Redisson’s source code also study again.

Redisson’s source code was quite a bit of work, though it was a bit of a fluke. It used a large number of Java concurrent classes, and used Netty as a communication tool to implement remote calls with Redis components. The focus of this article is mainly on the realization principle of Redisson distributed lock, so the network communication and concurrency principle of this piece of code interpretation will not be too careful, there are shortcomings also hope forgive!

Redis publishes subscriptions

As mentioned before, the core function of distributed lock is actually three: lock, unlock, set lock timeout. These three functions are also the direction of Redisson’s distributed locking principle.

Before we get started, it’s important to know something about Redis publishing and subscribing.

Redis publish subscription (PUB/SUB) is a message communication mode: The sender (PUB) sends the message, and the subscriber (SUB) receives the message. The publisher can send the message to the specified channel, and the subscriber can receive the message if he subscribes to the channel, so as to achieve the communication effect of multiple clients.

SUBSCRIBE channel[channel…] You can subscribe to one or more channels, and when a new message is sent to the channel via the PUBLISH command, the subscriber receives the message, as in this case

When two clients subscribe to channel1 and the other sends a message through PUBLISH, the subscriber receives the message, the communication between different clients can be realized by this mode.

Of course, we won’t expand on the clever scenarios of this communication mode, but you can check them out on the Internet. Our hero is Still Redisson, and it’s time for the main course.

Redisson source

Before using Redisson to lock, you need to obtain an RLock instance object. With this object, you can call the lock and tryLock methods to lock

Config config = new Config(); Config. UseSingleServer (.) setPassword (" "). The setAddress (" redis: / / 127.0.0.1:6379 "); RedissonClient redisson = Redisson.create(config); // RLock object RLock lock = redisson.getLock("myLock");Copy the code

With the corresponding host configured, you can create an RLock object. RLock is an interface that needs to be implemented by the specific synchronizer. When we call redisson.getLock(), the program initializes a default synchronizer, RedissonLock

There are a couple of parameters that are initialized,

CommandExecutor: An asynchronous Executor Executor where all commands in Redisson are executed by… Executed by Executor;

Id: unique ID, created with the UUID when initialized;

InternalLockLeaseTime: specifies the time to wait for obtaining the lock. The value read here is the default value defined in the configuration class. The value is 30 seconds.

At the same time, I also annotated a method getEntryName, the return is “ID: lock name” string, representing the current thread holding the corresponding lock identifier, these parameters must be left an impression, the subsequent source code parsing will often appear.

Having said the initialization stuff, we can start to learn lock and unlock the source code.

lock

Redisson has two lock methods, tryLock and lock. TryLock can be used to set the lock expiration time, leaseTime and waitTime.

tryLock

The code is a bit long… It’s not convenient to make a picture, so I’ll just post it up,

/** * @param waitTime Duration of waiting for a lock * @param leaseTime Duration of holding the lock * @param Unit Time unit * @return * @throws InterruptedException */ public boolean tryLock(long waitTime, long leaseTime, TimeUnit Unit) throws InterruptedException {// The remaining time to wait for the lock Long time = unit.tomillis (waitTime); long current = System.currentTimeMillis(); final long threadId = Thread.currentThread().getId(); Long TTL = tryAcquire(leaseTime, unit, threadId); If (TTL == null) {return true; if (TTL == null) {return true; Time -= (system.currentTimemillis () -current); if (time <= 0) { acquireFailed(threadId); return false; } current = System.currentTimeMillis(); Final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); // block waiting for lock release, await() returns false, wait timeout if (! await(subscribeFuture, time, TimeUnit.MILLISECONDS)) { if (! subscribeFuture.cancel(false)) { subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() { @Override public void operationComplete(Future<RedissonLockEntry> future) throws Exception { if (subscribeFuture.isSuccess()) { // Unsubscribe (subscribeFuture, threadId); }}}); } acquireFailed(threadId); return false; } try { time -= (System.currentTimeMillis() - current); if (time <= 0) { acquireFailed(threadId); return false; While (true) {long currentTime = system.currentTimemillis (); ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= (System.currentTimeMillis() - currentTime); if (time <= 0) { acquireFailed(threadId); return false; } // waiting for message currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= (System.currentTimeMillis() - currentTime); if (time <= 0) { acquireFailed(threadId); return false; } } } finally { unsubscribe(subscribeFuture, threadId); } // return get(tryLockAsync(waitTime, leaseTime, unit)); }Copy the code

The code is still quite long, but the process is only two steps, either the thread took the lock and returned successfully; Either the lock is not picked up and the wait time has not expired, and the lock is monitored to see if it is released.

The method is tryAcquire, passing in the lock holding time, the unit of time, and the ID of the current thread. If you follow the code to the call stack, it calls a method called tryAcquireAsync:

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(leaseTime, unit, threadId)); } private <T> RFuture<Long> tryAcquireAsync(Long leaseTime, TimeUnit unit, final Long threadId) {private <T> RFuture<Long> tryAcquireAsync(Long leaseTime, TimeUnit, final Long threadId) Call the tryLockInnerAsync method to get the lock if (leaseTime! = -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } // If you do not set the wait time for the lock, add another listener, i.e. call lock.lock() will run logic, I'm going to say RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.addListener(new FutureListener<Long>() { @Override public void operationComplete(Future<Long> future)  throws Exception { if (! future.isSuccess()) { return; } Long ttlRemaining = future.getNow(); // lock acquired if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); }}}); return ttlRemainingFuture; }Copy the code

Let’s go ahead and look at the source of the tryLockInnerAsync method:

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);" , Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); } String getLockName(long threadId) { return id + ":" + threadId; }Copy the code

Here is the bottom of the call stack, direct operation command, integrated into lua script, call Netty tool class to communicate with Redis, so as to achieve the function of obtaining the lock.

This script command is a bit interesting.

  • Use firstexists key Command to determine whether the lock is occupied, if nothsetKey is the lock name, field is unique ID of client: Thread ID, and value is 1.
  • The lock is occupied by the current thread.
  • The lock is not occupied by the current thread.

The logic of the command is not complicated, but I have to say that the author’s design is very careful, using the Hash structure of Redis to store data, if the current thread is found to have held the lock, the hincrby command will be used to increase the value of 1, the value of the value will determine the number of times to call the unlock command when the lock is released. To achieve the reentrant effect of the lock.

The logic for each step of the command is highlighted in the figure below, which you can read:

If the thread is holding the lock, the tryLock method will return true.

If not, the remaining expiration time of the lock is returned. What does this time do? Let’s go back to the dead-loop in the tryLock method:

There is a comparison of the remaining expiration time for waitTime and key, take the smaller of the two, and block the thread with the tryAcquire method of Java’s Semaphore Semaphore.

So who controls the Semaphore and when it’s released. If you want to go back to the tryLock code, you will remember that the tryLock code posted above also contains this paragraph:

current = System.currentTimeMillis(); Final RFuture<RedissonLockEntry> Subscribe = subscribe(threadId);Copy the code

The logic for subscribing is obviously in the subscribe method, which goes into publishsubscribe.java following the call chain of the method:

This code adds the current thread’s threadId to an AsyncSemaphore and sets up a Redis listener via redis’s publish/subscribe functionality.

Once the listener receives the message from Redis, it retrits the message for the current thread, and if the lock is released, it immediately releases the block by manipulating Semaphore (that is, calling release).

After release, the thread continues to execute, again determining whether it has timed out. If not, enter the next loop to retrieve the lock again, return true if not, continue the process.

The reason for the loop is that the lock may be contended by multiple clients at the same time. The thread may not be able to get the lock at the moment it is released, but the thread’s waiting time has not expired, so it needs to run the loop again to get the lock.

This is the process of tryLock acquiring the lock. If you draw a flow diagram, it looks something like this:

lock

In addition to tryLock, the lock acquisition process of lock is basically the same as that of tryLock. The difference is that lock does not manually set the lock expiration parameter. The call chain of the tryAcquire method also runs to acquire the lock. It goes to this part of the logic:

This code does two things:

1. Set an expiration time of 30 seconds and go to acquire the lock

2, open a listener, if found to get the lock, open a periodic task to constantly refresh the lock expiration time

Refresh expired when long method is scheduleExpirationRenewal, stick the source:

Private void scheduleExpirationRenewal (final long threadId) {/ / expirationRenewalMap is a ConcurrentMap, Store signs for "the name of the current thread ID: key" the task of the if (expirationRenewalMap. Either containsKey (getEntryName ())) {return; } Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout Throws Exception {// Lua script to check whether the lock exists, Exist with pexpire command to refresh the expiration time RFuture < Boolean > future. = commandExecutor evalWriteAsync (getName (), LongCodec. INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;" , Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); future.addListener(new FutureListener<Boolean>() { @Override public void operationComplete(Future<Boolean> future) throws Exception { expirationRenewalMap.remove(getEntryName()); if (! future.isSuccess()) { log.error("Can't update lock " + getName() + " expiration", future.cause()); return; } if (future.getNow()) { // reschedule itself scheduleExpirationRenewal(threadId); }}}); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); if (expirationRenewalMap.putIfAbsent(getEntryName(), task) ! = null) { task.cancel(); }}Copy the code

Start a timed task and check whether the lock is still held by the current thread every 10 seconds (internalLockLeaseTime / 3). If so, reset the expiration period. That’s 30 seconds.

ConcurrentHashMap = context context = context Context = context context = Context context = Context context = Context context Timed tasks will not run, which is an important step in unlocking later.

The code above is Redisson’s so-called “watchdog” program, which is periodically checked and executed by an asynchronous thread in case it expires before manual unlocking.

The rest of the logic is basically the same as tryLock()

unlock

There is a way to take a lock, and naturally there is an unlock. The upper level call method for Redisson distributed lock unlocking is unlock(), and no arguments are passed by default

Boolean opStatus = get(unlockInnerAsync(thread.currentThread ().getid ()));  if (opStatus == null) { throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + Thread.currentThread().getId()); } if (opStatus) {cancelExpirationRenewal() cancelExpirationRenewal(); }}Copy the code

The command operations associated with unlocking are defined in the unlockInnerAsync method,

This is a long list of lua scripts, slightly more complex than the previous command to lock the script, but no matter, let’s briefly review the logic of the command:

1. Determine whether the lock exists. If it does not, publish the message to release the lock.

Lock exists but is not held by the current thread, return empty nil;

3, the current thread holds the lock, use the hincrby command to set the number of reentrant times of the lock -1, and then determine whether the number of reentrant times is greater than 0. If yes, refresh the expiration period of the lock again and return 0; otherwise, delete the lock and release the message of the lock release, return 1;

When the thread fully releases the lock, the cancelExpirationRenewal() method is called to cancel the “watchdog” continuation thread

Void cancelationRenewal () {// expirationRenewalMap Won't execute programs for the current thread corresponding "watchdog" Timeout task = expirationRenewalMap. Remove (getEntryName ()); if (task ! = null) { task.cancel(); }}Copy the code

This is the process of releasing the lock, and how, is still relatively simple, read much more comfortable than the lock that code, of course, simple simple, in order to facilitate you sort out the whole process of distributed lock, of course, I still show you draw flowcharts of painstakingly! (blunt this, should give me a three ah, ha ha) :

RedLock

The above is the principle of Redisson distributed lock explanation, in general, is a simple use of Lua script integration of the basic set command to achieve the lock function, which is also the design principle of many Redis distributed lock tools. In addition, Redisson also supports the “RedLock algorithm “to implement the lock effect. This utility class is RedissonRedLock.

The usage is also very simple, creating multiple Redisson nodes, from which a complete distributed lock can be formed

RLock lock1 = Redisson.create(config1).getLock(lockKey);
RLock lock2 = Redisson.create(config2).getLock(lockKey);
RLock lock3 = Redisson.create(config3).getLock(lockKey);

RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
try {
   redLock.lock();
} finally {
   redLock.unlock();
}
Copy the code

I will not elaborate RedLock algorithm principle, we are interested in can see my previous article, or search on the net, in simple terms is to a certain extent, can effectively prevent the Redis instance of a single point of failure problem, but not entirely reliable, either design, Redis itself alone is no guarantee that the strong consistency of the lock.

Redis’ powerful performance and convenient use are enough to meet the daily needs of distributed locks. If the security risks of locks are intolerable for business scenarios, the most guaranteed way is to do idempotent processing in the business layer.

conclusion

See analysis the source of this article, I believe you the reader to Redisson distributed lock design also have enough understanding, of course, although is the interpretation of the source code, our main focus on the principle of distributed lock, has nothing to do some process code, there is no take you weigh words reading, we are interested can go to read, There are many places in the source code that show some basic concurrency tools and network communication, so it is very rewarding to learn about them.

Finally, I want to make fun of Redisson’s lack of comments…

If you think the article useful words, welcome to point to support, this will be the best encouragement to my creation!

Author: MY Humble Xue, an Internet person who does not stick to technology, like to use easy to understand language to deconstruct the back-end technology knowledge points, want to see more wonderful articles can pay attention to my public number, wechat search [HUMBLE Xue] can pay attention to