To make the whole life comfortable and happy, it is impossible, because human beings must have an attitude to cope with adversity — Rousseau

Author Mahua, the capital an ordinary programmer, original from the public number “source interest circle”

preface

Usually, we consider the key points and performance aspects when implementing distributed lock, which may not be very comprehensive. Redisson is the best one in the industry about distributed lock

However, whether to introduce the need to combine with their own projects, if only to introduce the function of distributed lock, I think it is not necessary, I can implement it by myself; If you rely heavily on the distributed functionality in Redisson, this can be referenced

Before reading Redisson source code, you need to understand the origin of distributed locks and the advantages and disadvantages of customizing each implementation. Click the link to view

Already into the lock

Before we talk about Redisson, let’s talk about JDK ReentrantLock

ReentrantLock ensures that only a single thread can operate on a JVM shared resource at any one time

Implementation approach

Already internal fair lock with the fair lock inherited AQS [AbstractQueuedSynchronizer]

1. AQS uses the int variable state modified by VOLATIl to control thread safety and lock reentrant in concurrent situations

2. Put the threads that are not locked into the queue of AQS to wake up through LockSupport#park and unPark suspension

See the link address for AQS and RLock principles

Redisson

If you’re not familiar with Github Redisson’s WIKI, check out the Redisson WIKI directory to see how Redisson has armed Redis to the teeth

Here’s a look at some of the content related to the article

From the project introduction, we can see that the person who wrote this project introduction is very skilled. From the first paragraph, we know two questions

What is Redisson

Redisson is a Java resident memory data grid framework based on Redis, which makes full use of a series of advantages provided by Redis key database and provides users with a series of common tool classes with distributed characteristics based on common interfaces in Java utility toolkit

The advantage of Redisson

As a result, the toolkit which was originally used to coordinate single multithreaded concurrent program has obtained the ability of coordinating distributed multi-machine multithreaded concurrent system, which greatly reduces the difficulty of designing and developing large-scale distributed system

At the same time, it further simplifies the cooperation between programs in the distributed environment by combining various distributed services

That’s about it, I won’t expand down, but if you want to know more about what it’s used for, check out the table of contents above

Redisson reentrant lock

Because Redisson isso complex, most of the API calls are designed using Netty, so the analysis here is only about how to lock, how to implement reentrant lock, and how to renew the lock

Create a lock

I have downloaded Redisson’s source code locally

This simple program uses Redisson to create an unfair reentrant lock

The lock() method has a default expiration time of 30 seconds and supports the “watchdog” renewal function

public static void main(String[] args) {
    Config config = new Config();
    config.useSingleServer()
            .setPassword("123456")
            .setAddress("Redis: / / 127.0.0.1:6379");
    RedissonClient redisson = Redisson.create(config);

    RLock lock = redisson.getLock("myLock");

    try {
        lock.lock();
        // Business logic
    } finally{ lock.unlock(); }}Copy the code

Let’s first look at the declaration of the RLock interface

public interface RLock extends Lock.RLockAsync {}
Copy the code

RLock inherits the Lock interface from JDK source JUC and RLockAsync

RLockAsync is literally a lock that supports asynchrony, proving that locks can be acquired asynchronously

Read Redisson’s source code will know that comments are more expensive than gold 🙃️

Because get lock API more, we here to lock() do source code to explain, look at the interface definition is quite simple

/** * lock does not specify the lock expiration time, default 30 seconds * if the lock is acquired, the lock will be renewed */
void lock(a);
Copy the code

Get lock instance

Based on the little Demo above, how do you get the lock in step 1

RLock lock = redisson.getLock("myLock");

// name is the lock name
public RLock getLock(String name) {
  	// Create a synchronous actuator by default (there are asynchronous actuators, because lock acquisition and release are strong consistency requirements, default synchronization)
    return new RedissonLock(connectionManager.getCommandExecutor(), name);
}
Copy the code

All Redis commands in Redisson are executed by… The Executor implementation

After obtaining the default synchronizer, initialize the RedissonLock

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
    super(commandExecutor, name);
    this.commandExecutor = commandExecutor;
  	/ / the only ID
    this.id = commandExecutor.getConnectionManager().getId();
  	// Wait to obtain the lock time
    this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
  	// ID + lock name
    this.entryName = id + ":" + name;
  	// Publish subscriptions, which will be used later in the add/unlock process
    this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
Copy the code

Attempt to acquire a lock

How does RLock#lock() get the lock

@Override
public void lock(a) {
    try {
        lock(-1.null.false);
    } catch (InterruptedException e) {
        throw newIllegalStateException(); }}Copy the code

LeaseTime: lock expiration time. -1 Use the default value 30 seconds

Unit: Time unit: milliseconds, seconds, minutes, hours…

Interruptibly: Whether an interrupt can be identified

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    // Get the current thread ID
    long threadId = Thread.currentThread().getId();
    // 🚩 tries to obtain the lock
    Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
    // The lock was successfully acquired with an empty expiration time
    if (ttl == null) {
        return;
    }

    // Subscribe to a distributed lock to be notified when it is unlocked
    RFuture<RedissonLockEntry> future = subscribe(threadId);
    if (interruptibly) {
        commandExecutor.syncSubscriptionInterrupted(future);
    } else {
        commandExecutor.syncSubscription(future);
    }

    try {
        while (true) {
            // Try again to obtain the lock
            ttl = tryAcquire(-1, leaseTime, unit, threadId);
    				// Succeeded in obtaining the lock, the expiration time is empty, success returned
            if (ttl == null) {
                break;
            }

            // If the lock expiration time is greater than zero, block fetch with expiration time is performed
            if (ttl >= 0) {
                try {
                    // If the lock is not acquired, it will block here, Semaphore, and release Semaphore notification when unlocking
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    if (interruptibly) {
                        throw e;
                    }
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
                // If the lock expiration time is less than zero, the lock is dead
            } else {
                if (interruptibly) {
                    future.getNow().getLatch().acquire();
                } else{ future.getNow().getLatch().acquireUninterruptibly(); }}}}finally {
        // Unsubscribeunsubscribe(future, threadId); }}Copy the code

This piece of code is used to perform the locking, so let’s move on to the method implementation

Long ttl = tryAcquire(-1, leaseTime, unit, threadId);

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
Copy the code

The lock () and tryLock (…). Method eventually calls this method, divided into two process branches

1, tryLock (…). API asynchronous locking returns

2. Lock() & tryLock() API asynchronously locks and continues the lock

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    / / execution tryLock (...). Will enter the
    if(leaseTime ! = -1) {
        // Get the lock asynchronously
        return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    // Try to obtain the lock asynchronously. If the lock is obtained successfully, null is returned. Otherwise, the remaining lock expiration time is returned
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
            commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
            TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    // ttlRemainingFuture This operation is triggered after execution is complete
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if(e ! =null) {
            return;
        }
      	// ttlRemaining == null Indicates that the lock is obtained
        // Perform the continuation operation after obtaining the lock
        if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); }});return ttlRemainingFuture;
}
Copy the code

Keep looking at tryLockInnerAsync(…) Detailed locking process, internal use of Lua script form, to ensure atomic operation

At this point it should be clear that Lua scripts are Redisoon wrapped and then transmitted via Netty

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);

    return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
Copy the code

evalWriteAsync(…) The Eval command and the use of Netty are not followed up

Lock the Lua

Redis lock Lua script, a screenshot to let you see the parameters and the specific meaning

KEYS[1]: myLock

ARGV[1]: 36000… This is the expiration time, self-tested in milliseconds

ARGV[2]: UUID + thread ID

# KEYS[1MyLock # judge KEYS[1] If yes, return1, there is no return0
if (redis.call('exists', KEYS[1= =])0) then# when KEYS [1] = =0# use hincrby to find KEYS[1] does not exist and creates a hash # ARGV[2] as the first key of the hash, val is1# hincrby myLock91089b45... 1
	redis.call('hincrby', KEYS[1], ARGV[2].1); # set KEYS [1] Expiration time in milliseconds redis.call('pexpire', KEYS[1], ARGV[1]);
	return nil;
end; # to find KEYS [1] key ARGV [2] If yes, a return is returned1
if (redis.call('hexists', KEYS[1], ARGV[2= =])1) then# ARGV[2] is val + for key1
	redis.call('hincrby', KEYS[1], ARGV[2].1); The above # redis. Call ('pexpire', KEYS[1], ARGV[1]);
return nil;
end; # return KEYS [1] Expiration time, in millisecondsreturn redis.call('pttl', KEYS[1]);
Copy the code

The whole Lua script lock process is illustrated as follows:

Now let’s go back to how the lock is deferred once acquired

Locked to continue

I have talked about this topic with my friends before, and the idea is basically the same as that reflected in Redisson

Let’s talk about Redisson’s idea, which is translated into Chinese as “watchdog.”

1. Execute the “watchdog” process after obtaining the lock

2. Use Netty Timeout to implement timing delay

3. For example, if the lock expires for 30 seconds, it will check whether the lock exists every 1/3 of the time, that is, 10 seconds, and update the timeout period of the lock if it exists

What if the check return exists and the lock is set to expire just as the lock is released?

With such a question, the representative really thought through all possible scenarios, but there was no need to worry

The Lua script used in Redisson does checks and sets expiration times that are atomic and do not occur

If you do not want to reference Netty packages, you can also use package tools such as delayed queues to complete the “watchdog”.

Here is also a ha of relevant code, can let small partners more intuitive understanding of how to lock the continuation of the

RedissonLock#tryAcquireAsync(…)

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    // ...
    // Try to obtain the lock asynchronously. If the lock is obtained successfully, null is returned. Otherwise, the remaining lock expiration time is returned
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
            commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
            TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    // ttlRemainingFuture This operation is triggered after execution is complete
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if(e ! =null) {
            return;
        }
        // Perform the continuation operation after obtaining the lock
        if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); }});return ttlRemainingFuture;
}
Copy the code

You can see that the continuation method continues threadId as an identifier

private void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if(oldEntry ! =null) {
        oldEntry.addThreadId(threadId);
    } else{ entry.addThreadId(threadId); renewExpiration(); }}Copy the code

It’s good to know the core idea, no need to study every line of code

private void renewExpiration(a) {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }

    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }

            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                if(e ! =null) {
                    log.error("Can't update lock " + getName() + " expiration", e);
                    return;
                }

                if (res) {
                    // Call itselfrenewExpiration(); }}); } }, internalLockLeaseTime /3, TimeUnit.MILLISECONDS);

    ee.setTimeout(task);
}
Copy the code

Unlock operation

The operation of unlocking is relatively simple

@Override
public void unlock(a) {
    try {
        get(unlockAsync(Thread.currentThread().getId()));
    } catch (RedisException e) {
        if (e.getCause() instanceof IllegalMonitorStateException) {
            throw (IllegalMonitorStateException) e.getCause();
        } else {
            throwe; }}}Copy the code

After the unlock is successful, the previous “watchdog” Timeout will be cancelled, and return to success

@Override
public RFuture<Void> unlockAsync(long threadId) {
    RPromise<Void> result = new RedissonPromise<Void>();
    RFuture<Boolean> future = unlockInnerAsync(threadId);

    future.onComplete((opStatus, e) -> {
      	// Cancel the automatic continuation function
        cancelExpirationRenewal(threadId);

        if(e ! =null) {
          	/ / fail
            result.tryFailure(e);
            return;
        }

        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            result.tryFailure(cause);
            return;
        }
				// The account is unlocked successfully
        result.trySuccess(null);
    });

    return result;
}
Copy the code

Another quintessential point, unlocking the Lua script definition

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                    "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return nil;",
            Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
Copy the code

Lua script will analyze it in detail

Unlock the Lua

Same old rule — pictures and specs

KEYS[1]: myLock

KEYS[2]: redisson_lock_channel:{myLock}

ARGV[1]: 0

ARGV[2]: 360000… (Expiration time)

ARGV[3]: 7f0c54e2… (Lock Key in Hash)

Judging # KEYS [1Is ARGV[present in]3]
if (redis.call('hexists', KEYS[1], ARGV[3= =])0) then
return nil;
end; # KEYS [1ARGV [in]3] Val - 1
local counter = redis.call('hincrby', KEYS[1], ARGV[3].- 1); # if return greater than0Turns out to be a reentry lockif (counter > 0) then# rework expiration time redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else# delete KEYS [1]
	redis.call('del', KEYS[1]); Redis. Call ('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;
Copy the code

Redlock algorithm

Admittedly, Redisson’s distributed lock design is fantastic, but it still doesn’t solve the problem of lock loss caused by asynchronous synchronization of data between master and slave nodes

Therefore, Antirez, the author of Redis, introduced the red-lock algorithm. The essence of this algorithm is: there is no slave node. If multiple Redis are deployed, each instance is independent from each other, and there is no master-slave replication or other cluster coordination mechanism

How to use

Create multiple Redisson nodes that form a complete distributed lock

public static void main(String[] args) {
    String lockKey = "myLock";
    Config config = new Config();
    config.useSingleServer().setPassword("123456").setAddress("Redis: / / 127.0.0.1:6379");
    Config config2 = new Config();
    config.useSingleServer().setPassword("123456").setAddress("Redis: / / 127.0.0.1:6380");
    Config config3 = new Config();
    config.useSingleServer().setPassword("123456").setAddress("Redis: / / 127.0.0.1:6381");

    RLock lock = Redisson.create(config).getLock(lockKey);
    RLock lock2 = Redisson.create(config2).getLock(lockKey);
    RLock lock3 = Redisson.create(config3).getLock(lockKey);

    RedissonRedLock redLock = new RedissonRedLock(lock, lock2, lock3);

    try {
        redLock.lock();
    } finally{ redLock.unlock(); }}Copy the code

Of course, the Redlock algorithm is not without its doubts, you can go to the Redis website to check out Martin Kleppmann and Redis author Antirez debate

Trade-offs between CAP principles

The CAP principle, also known as the CAP theorem, refers to the importance of Consistency, Availability and Partition tolerance in a distributed system

Consistency (C) : Whether all data backups in a distributed system have the same value at the same time (equivalent to all nodes accessing the same latest copy of data)

Availability (A): Whether the cluster as A whole can respond to read/write requests from clients after some nodes fail (high availability for data updates)

Partition tolerance (P): In practical terms, partitioning is equivalent to time-bound requirements for communication. If the system cannot achieve data consistency within the time limit, it means that A partitioning situation has occurred and that it must choose between C and A for the current operation

Distributed lock selection

To achieve strong consistency between distributed locks, Zookeeper’s distributed lock can be used because its underlying ZAB protocol (atomic broadcast protocol) naturally meets CP

But this also means performance degradation, so looking at Redis and Zookeeper without looking at specific data represents a trade-off between performance and consistency

If the project does not rely heavily on ZK, Redis is good, because Redis is so versatile these days that most projects reference Redis

There is no need to introduce a new component for this, if the business scenario becomes intolerable for Redis asynchronously synchronizing data with lock loss, it can be handled in the business layer

Write the last words

Recently are writing multi-threaded source code related, the following will output JUC source code analysis

1, CountDownLatch

2, ThreadLocal

3

Including two recent distributed lock articles, the length is relatively long, I hope you can be patient to watch

I hope you can give feedback to correct the wrong and incorrect places in the article 🙏, the love of small partners is the biggest support for me, and finally I hope you can like, comment, see three even!

Recommended reading:

  • Essay | how to solve the JDK thread pool shall not exceed the maximum number of threads under rapid consumption task
  • Swastika graphic | a chat already and AQS that something (read not you find me)
  • How to handle thread execution exceptions in the JDK thread pool?
  • ParallelStream, a new Java8 feature, is used with caution