Writing is not easy, like collection attention to a key three, so that next time to see, thank you for your support ~

In the last two articles, we talked about how to implement distributed locks using SQL databases and Zookeeper.

Basic knowledge of distributed lock (2) : ZooKeeper distributed lock principle analysis and actual case

Introduction to Distributed Lock (1)

Today let’s talk about how to use Redis to implement the corresponding distributed lock, and the difference between this implementation and the first two methods.

Redis common commands

Before introducing distributed locks, let’s look at common redis commands:

1, SET the key value [EX seconds] [PX milliseconds] [NX | XX], the string value associated with the key values. If key already holds another value, SET overwrites the old value, regardless of the type. Starting with Redis 2.6.12, the behavior of the SET command can be modified with a series of parameters:

  • EX second: Sets the expiration time of the key tosecondSeconds.SET key value EX secondEffect equivalent toSETEX key second value
  • PX millisecond: Sets the expiration time of the key tomillisecondMilliseconds.SET key value PX millisecondEffect equivalent toPSETEX key millisecond value
  • NX: Sets the key only when the key does not exist.SET key value NXEffect equivalent toSETNX key value
  • XX: Sets a key only when the key already exists.

EXPIRE key seconds specifies the lifetime of a given key. When the key expires (lifetime 0), it will be deleted automatically.

3, SETEX key seconds value, associate the value value with the key and set the lifetime of the key to seconds.

This command is similar to the following two commands:

EXPIRE key seconds SET key value EXPIRE key secondsCopy the code

The main difference between the SETEX command and the SET+EXPIRE command is that the SETEX command can maintain atomicity, while the SET+EXPIRE command belongs to two commands and is difficult to maintain atomicity.

4, DEL key [key…] To delete one or more keys given.

5. SETNX key value: sets the key value to value if and only if the key does not exist. If the given key already exists, SETNX does nothing.

The key commands for distributed locking are listed above. If there are any other commands that are unclear or not mentioned, click on this article to find them.

The Lua script

Next, I need to introduce a relatively uncommon content in Redis, Lua scripts.

Generally, when we need to operate Redis, we need to enter the Redis client and edit input through one command after another, so as to complete the corresponding REDis operation.

This method is relatively easy to operate, and it is timely feedback, and friendly when the number of commands is small and the operation is simple.

However, when there are many commands to be executed and the commands may depend on each other, it is not very friendly to enter one command after another.

For this purpose, Redis specifically introduces Lua scripts, which users can send to the server to perform custom actions and retrieve the response data of the script.

Another feature is that the Redis server executes lua scripts atomically, single-threaded, ensuring that lua scripts are not interrupted by any other requests during processing. This is also one of the advantages of Lua scripts over the continuous execution of a single command.

Analysis of distributed lock principle

Redis can realize distributed lock in two ways: 1. 2, based on lua script implementation.

Based on redis command

The logic of implementation is mainly summarized as follows:

  1. When the thread enters the program, SETNX command is used to set the key value in the cache. If the setting is successful, it proves that the lock is successful.
  2. When the thread exits the program, use the DEL command to delete the key value to unlock the thread.
SETNX key value # lock # Implement the corresponding business code logic DEL key value # unlockCopy the code

However, there is an obvious problem with this. If a thread hangs during the lock process for some specific reason and is not unlocked, a deadlock can occur, which can seriously affect the performance of the entire system.

The EXPIRE command is also used to prevent deadlocks by adding an expiration time to the KEY values.

SETNX key value # add lock EXPIRE key seconds # set expiration time # implement corresponding business code logic DEL key value # unlockCopy the code

Is that the end of the problem? Apparently not!

Before we said that because the lock and set the expiration time of the code is two commands, and redis in the execution of two commands can not ensure the atomicity, so it may appear in the execution of SETNX command, downtime, so there is a deadlock!

Therefore, in Redis, the set command is extended and we can replace the above code with the following code.

SET key value EX seconds NX # SET key value EX seconds NX # # Implement the corresponding business code logicCopy the code

However, lock reentrant is still a challenge because we use the NX parameter, which makes it difficult to reentrant the lock;

Based on Lua script implementation

Lua scripts, on the other hand, solve these problems better thanks to the atomicity of their execution.

The general process of locking code implemented by Lua script is as follows:

The main process of lua script unlocking is as follows:

More detailed code analysis, in Redisson source analysis we will analyze.

However, it should be noted that it is difficult to set the expiration time of the lock. If the lock is not released for a long time, the performance will be affected. If the setup is short, the lock is released before the business code completes execution, and there is no way to restrict the code execution of other threads. Subtly, existing frameworks already have daemon threads (watchdogs) to automatically extend expiration times, simplifying the threshold for use.

The code field

This code practice, we use Redission to achieve distributed lock, in fact, Redission framework for distributed lock encapsulation is relatively perfect, only need a little code can achieve corresponding distributed lock and unlock.

First, we’ll write a configuration class that loads our container into Spring. One thing to note here is that the @bean annotation will use the method name as the container name by default. Make sure that our method name matches the name of the container to be loaded. It is also possible to explicitly specify the container name using @bean (value = “redissionClient”).

@Configuration
public class RedisConfig {

  // Enter your redis IP :port in application.yml
    @Value("${redis.address}")
    private String redisAddress;

    @Bean
    public RedissonClient redissonClient(a) {
        Config config = new Config();
        config.useSingleServer().setAddress(redisAddress);
        returnRedisson.create(config); }}Copy the code

After injecting the corresponding container into Spring’s framework, we call redission’s key method getLock to get the corresponding lock. The lock can then be locked by calling the corresponding tryLock method. In this case, the lock is polymorphic. The main differences are as follows:

// If the lock is not available, return false.
boolean tryLock(a); 
// Acquire the corresponding lock within the given time (if the thread is not interrupted)
boolean tryLock(long time, TimeUnit unit) throws InterruptedException; 
Copy the code

Here we use a simple method to achieve, directly using tryLock() to transform the corresponding code content, the transformed code is as follows:

@Resource
RedissionClient redissionClient;

public Boolean deductProduct(ProductPO productPO){
  // Get the distributed lock first
  RLock lock = redissonClient.getLock("deductProduct");
  try{
    LOGGER.info("Distributed lock lock!");
    // Try to lock redis distributed lock
    boolean success = lock.tryLock(30, TimeUnit.SECONDS);
    if(! success){// Lock failed, return directly
      return false;
    }
    LOGGER.info("The data for finding the goods is :"+ JSON.toJSONString(productPO));
    Example example = new Example(ProductPO.class);
    Example.Criteria criteria = example.createCriteria();
    criteria.andEqualTo("skuId", productPO.getSkuId());
    List<ProductPO> productPOS = productMapper.selectByExample(example);
    if (CollectionUtils.isEmpty(productPOS)){
        throw new RuntimeException("Current item does not exist.");
    }
    for (ProductPO selectProductPO: productPOS){
      // Subtract the corresponding SKUS
      Integer number = selectProductPO.getNumber();
      LOGGER.info("The current quantity of goods is :"+number);
      if (number<=0) {// If the value is less than or equal to 0, no deduction is made
        continue; } selectProductPO.setNumber(number-productPO.getNumber()); productMapper.updateByPrimaryKey(selectProductPO); }}finally {
    // Finally, remember to release the lock resource
    LOGGER.info("Distributed lock release!);
    lock.unlock();
  }
  return true;
}
Copy the code

Then run our code to get the corresponding result:

Source analyses

Lock the source code

Analyze the code for tryLock(), the lock.

boolean success = lock.tryLock();
Copy the code

Diving into the key source code, the main code is as follows:

private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  RFuture<Boolean> acquiredFuture;
  if(leaseTime ! = -1) {
    /* Key code */
    acquiredFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
  } else {
    /* Key code */
    acquiredFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                                       TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
  }
  CompletionStage<Boolean> f = acquiredFuture.thenApply(acquired -> {
    // If the lock is acquired successfully
    if (acquired) {
      if(leaseTime ! = -1) {
        // If you specify the lease time, update the lease time of the class
        internalLockLeaseTime = unit.toMillis(leaseTime);
      } else {
        // Otherwise, save the current ThreadId to a corresponding ConcurrentMap,
        // Start the daemon thread and refresh the expiration time of the lock held by the thread ID periodically. Avoid lock expiration and release issuesscheduleExpirationRenewal(threadId); }}return acquired;
  });
  return new CompletableFutureWrapper<>(f);
}
Copy the code

The key code you can see in the command to get the lock is tryLockInnerAsync.

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
  // The logic of this command is relatively clear, first check whether the current key value exists
  //==0 if the hash key does not exist, add the hash key and field object
  //==1 indicates that the hash key and the corresponding field object exist, refresh its expiration time, and return its remaining timeout time.
  return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                        "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                        Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
Copy the code

We can see that the underlying source code of Redission is executed in the way of Lua script. Some key parameters and commands are listed as follows:

  • KEYS [1], is the Collections. SingletonList (getName ()), said distributed lock key, namely REDLOCK_KEY;

  • ARGV[1], is internalLockLeaseTime, i.e., the lock lease time, default 30s;

  • ARGV[2], which is getLockName(threadId), is the unique value of the set when acquiring the lock, i.e. UUID+threadId

  • “Pexpire”, to set the key timeout, reuse of an existing key will refresh the expiration time.

  • “Hincrby” is the atomic addition or subtraction of a field object in a hash object.

  • “PTTL”, returns the expiration time of the current key.

  • “Exists” : checks whether the current key exists.

Also noteworthy scheduleExpirationRenewal in source code:

protected void scheduleExpirationRenewal(long threadId) {
  ExpirationEntry entry = new ExpirationEntry();
  ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
  if(oldEntry ! =null) {
    // Add a new ThreadId to the hash structure corresponding to key
    oldEntry.addThreadId(threadId);
  } else {
    / / same as above
    entry.addThreadId(threadId);
    try {
      renewExpiration();// Start daemon thread, key code
    } finally {
      if(Thread.currentThread().isInterrupted()) { cancelExpirationRenewal(threadId); }}}}private void renewExpiration(a) {
  ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
  if (ee == null) {
    return;
  }
	// Start a scheduled task
  Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
    @Override
    public void run(Timeout timeout) throws Exception {
      ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
      if (ent == null) {
        return;
      }
      Long threadId = ent.getFirstThreadId(); // Get the first thread Id corresponding to the key
      if (threadId == null) {
        return;
      }
      RFuture<Boolean> future = renewExpirationAsync(threadId); // Update the expiration time in script mode here
      future.whenComplete((res, e) -> {
        if(e ! =null) {
          log.error("Can't update lock " + getRawName() + " expiration", e);
          EXPIRATION_RENEWAL_MAP.remove(getEntryName());
          return;
        }
        if (res) {
          // If the update is successful, then repeat the method to update again.
          renewExpiration();
        } else {
          // Otherwise cancel the update
          cancelExpirationRenewal(null); }}); } }, internalLockLeaseTime /3, TimeUnit.MILLISECONDS);
  ee.setTimeout(task);
}
Copy the code

From this analysis comes down, the whole lock process is relatively clear. The process is mainly as follows:

1. First check whether the key exists

  • Return 0, it means that the hash key does not exist, then the new hash key and field object;
  • Return 1, the hash key and the corresponding field object exist, refresh its expiration time, and return its remaining timeout time.

2. If locking is successful, there will be different strategies based on the lease duration.

  • If an expiration time is specified, the daemon thread is not started, but the lock is released automatically when it expires
  • If no expiration time is specified, a daemon thread is started to update the redis lock time for the corresponding thread ID.

Unlock the source code

Key codes to unlock are as follows:

@Override
public RFuture<Void> unlockAsync(long threadId) {
  /* Unlock key code */
  RFuture<Boolean> future = unlockInnerAsync(threadId);
  CompletionStage<Void> f = future.handle((opStatus, e) -> {
    // Unlock the watchDog mechanism, that is, disable the automatic delay mechanism.
    cancelExpirationRenewal(threadId);
    if(e ! =null) {
      throw new CompletionException(e);
    }
    if (opStatus == null) {
      IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                                                                            + id + " thread-id: " + threadId);
      throw new CompletionException(cause);
    }
    return null;
  });
  return new CompletableFutureWrapper<>(f);
}
Copy the code

UnlockInnerAsync = unlockInnerAsync

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
  // If no distributed lock exists, return
  // Otherwise count the corresponding key/field object -1 (for reentrant locks)
  // If the count is >0, the lock expiration time is refreshed again
  // Otherwise, delete the lock and notify the corresponding channel.
  return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                        "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return nil;",
                        Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
Copy the code

The main related parameters are listed as follows:

  • KEYS[1], which is getRawName(), represents the distributed lock key;
  • KEYS[2], which is getChannelName(), combines distributed KEYS with fixed prefixes to send unlocked messages to specific channels.
  • ARGV[1], is lockpubsub. UNLOCK_MESSAGE, i.e., the type of message sent, here [unlock];
  • ARGV[2], is internalLockLeaseTime, i.e., the lock lease time, default 30s;
  • ARGV[3] is getLockName(threadId), which is the unique value of the set when obtaining the lock, i.e. UUID+threadId.
  • The Publish, command is used to send information to the specified channel. (Ps: For an introduction to Redis publishing subscriptions, see here.)

See here I have some doubts, why we are not in front of the message monitoring, but here to understand the lock message broadcast? TryLock () does not listen for channel messages, but tryLock(long waitTime, long leaseTime, TimeUnit Unit) does.

@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {...// Listen for the corresponding messageCompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); . }Copy the code

After subscribing, the thread goes into spin. The listener receives the unlock message and releases the semaphore, which will wake up the other threads blocking here. (Can not help Shouting: “the author is awesome.” )

From this comb down, the whole unlock process is relatively clear. Mainly for:

1, first check whether the key exists (lua script)

  • Does not exist, return directly.
  • Yes, lock thread count -1.

2. If the count deduction is successful, there will be different strategies based on the count.

  • If the lock thread count is greater than 0, it means that the lock is in a reentrant state, the expiration time is refreshed, and exits.
  • If the count is less than or equal to 0, the corresponding lock is deleted and a broadcast message is sent to remind other locks to scramble.

3. Finally, after the unlock is successful, the corresponding [watchdog mechanism] needs to be suspended to close the corresponding automatic delay task.

Advantages and disadvantages analysis

advantage

1, based on cache implementation, good performance,

2, LuA script implementation, good expansion, can support lock reentrant, subscription/publishing and other functions.

3, the existing framework has been implemented, out of the box.

4, support watchDog automatic delay function.

disadvantages

1. If high consistency is not maintained, the content saved in each Redis in distributed mode cannot be guaranteed to be completely consistent at all times. Therefore, when reading and writing, the same Redis instance needs to be read.

2, and in the master-slave case, after writing to the master Redis instance, the master instance has not had time to synchronize to the slave instance, which leads to the slave instance can be locked again, there are multiple servers locked at the same time, you can further understand the REDLOCK algorithm.

reference

A series of problems and solutions with Redis distributed locks

Redisson implements Redis distributed lock in N positions

Redis application details (a) distributed lock

Redis command reference

Redis distributed lock facing problems and solutions