1: The project background is that the user places an order and then deducts the inventory

@postmapping ("/order") public String order() {// The number of remaining items in the inventory Integer count = (Integer)redisTemplatel.opsForValue().get(PRODUCT_KEY); if(count > 0) { count = count - 1; redisTemplatel.opsForValue().set(PRODUCT_KEY,count); return "success"; }else {return "Out of stock "; }}Copy the code

1.1: The code does seem to be fine, check the cache normally from the database, subtract 1 if the item is available, otherwise return the low stock message directly to the client

2.2: But in high concurrency scenarios, this code is problematic

  • 1: Get inventory: We first need to get the remaining inventory quantity from the database
  • 2: Update inventory: Modify the remaining inventory in the database

2.3: The above two operations are not atomic operations, so the following situation may occur

-1: Thread-1 first queried the inventory and obtained the inventory quantity of 100. -2: Thread-1 did not update the inventory, so Thread-2 queried the inventory of 100. If one inventory of Thread-2 is subtracted, the updated inventory is still 99, then there is a problem. Two products are sold, but only one inventory is reduced, so the phenomenon of overselling occursCopy the code

2: Lock to ensure not oversold

  • Synchronized: If the project is deployed on a single machine, the synchronized keyword can be used to ensure that the project is deployed in a cluster. However, synchronized is a JVMS based lock, so it cannot guarantee oversold in a distributed environment
  • Redis Distributed locks: Redis provides commands that allow you to implement distributed locks

3: Add redis distributed lock

@PostMapping("/order") public String order() { try{ Object result = redisTemplate.opsForValue().get(LOCK_KEY); if(result ! = null) {return "please try again later "; } redisTemplate.opsForValue().set(LOCK_KEY,LOCK_VALUE); / / simulation database access inventory surplus quantity Integer count = (Integer) redisTemplate. OpsForValue () get (PRODUCT_KEY); if(count > 0) { count = count - 1; redisTemplate.opsForValue().set(PRODUCT_KEY,count); return "success"; }else {return "Out of stock "; } }finally { redisTemplate.delete(LOCK_KEY); }}Copy the code
  • 1: a thread checks whether there is a key in the redis. If there is a key in the redis, it does not allow the execution to continue. If there is no key in the redis, it obtains the lock and performs the destocking operation
  • 2: The logic seems to be ok, but we can query the key from Redis and set the key to not atomic operation, so we can’t guarantee the thread safety

4: Redis lock to ensure atomicity

  • Redis provides the setnx command to ensure the atomicity of the query key
@PostMapping("/order") public String order() { try{ Boolean result = redisTemplate.opsForValue().setIfPresent(LOCK_KEY, LOCK_VALUE); if(! Result) {return "Service busy, please try again later "; } / / simulation database access inventory surplus quantity Integer count = (Integer) redisTemplate. OpsForValue () get (PRODUCT_KEY); if(count > 0) { count = count - 1; redisTemplate.opsForValue().set(PRODUCT_KEY,count); return "success"; }else {return "Out of stock "; } }finally { redisTemplate.delete(LOCK_KEY); }}Copy the code
  • Now the atomicity problem is solved, but the code still has problems
  • 1: If the system hangs in the middle of a thread’s execution, the key will remain in place forever. As long as the key remains in place, subsequent threads will never be able to acquire the lock and will not be able to perform subsequent operations

5: Adds the expiration time to the KEY

@PostMapping("/order") public String order() { try{ Boolean result = redisTemplate.opsForValue().setIfPresent(LOCK_KEY, LOCK_VALUE,10, TimeUnit.SECONDS); if(! Result) {return "Service busy, please try again later "; } / / simulation database access inventory surplus quantity Integer count = (Integer) redisTemplate. OpsForValue () get (PRODUCT_KEY); if(count > 0) { count = count - 1; redisTemplate.opsForValue().set(PRODUCT_KEY,count); return "success"; }else {return "Out of stock "; } }finally { redisTemplate.delete(LOCK_KEY); }}Copy the code
  • Now we have added the effect time, so that even if the thread hangs in the middle of execution and does not release the lock, the lock will be automatically released after 10 seconds, and other threads can still acquire the lock, then no problem?
  • 1: The timeout is now 10s. If our business logic for destocking is complex and the thread business process takes 15s, then the problem will arise
1: The timeout duration of thread-1 is 10s, but the service processing takes 15s. This means that the lock is released before the Thread is finished processing the service. If a second Thread, Thread-2, enters at 11s, the lock of Thread-1 will be released at 10s, so thread-2 will definitely be able to acquire the lock. After acquiring the lock, Thread-2 will continue to perform destocking operation 3: At the 15s, after the inventory reduction business of Thread-1 is processed, the lock will be released. However, the lock released by Thread-1 is no longer the one obtained by itself, but the lock 4 of Thread-2 is released: That's when the lock will fail, and if you have tens of thousands of threads coming in, you'll have a problemCopy the code

6: Add a unique value to the thread

@PostMapping("/order") public String order() { String uuid = UUID.randomUUID().toString(); try{ Boolean result = redisTemplate.opsForValue().setIfPresent(LOCK_KEY, uuid,10, TimeUnit.SECONDS); if(! Result) {return "Service busy, please try again later "; } / / simulation database access inventory surplus quantity Integer count = (Integer) redisTemplate. OpsForValue () get (PRODUCT_KEY); if(count > 0) { count = count - 1; redisTemplate.opsForValue().set(PRODUCT_KEY,count); return "success"; }else {return "Out of stock "; } }finally { if(uuid.equals(redisTemplate.opsForValue().get(LOCK_KEY))) { redisTemplate.delete(LOCK_KEY); }}}Copy the code
  • 1: Even if the lock is acquired by Thread-2, Thread-1 will determine the unique value when releasing the lock and will not delete other locks if the value is inconsistent
  • 2: But there is still a problem, is this timeout limit, if the time is too short, then the lock will be released before the end of the service processing, what is the problem?
  • 1: Assuming that there is only one item in stock, Thread-1 is dealing with business but has not updated the inventory. At this time, because the timeout period of the lock has expired, Thread-2 will acquire the lock. When Thread-2 searches the inventory, it finds that there is still another item, so Thread-2 will perform the operation of reducing the inventory. That’s when oversold occurs
  • 2: If Thread 1 hangs for a long time, the lock will be released automatically until the timeout expires. Other threads will not be able to acquire the lock during the idle time, which will cause a waste of resources.

7: Give lock renewal time

  • 1: After Thread-1 obtains the lock, a timer is set for Thread-1. The timer extends the timeout period of Thread-1
  • 2: If the Thread is running and the lock has not been released, reset the lock timeout to 30s. If the Thread is terminated, the lock timeout is set to 30 seconds. The lock is automatically released when the timeout period expires. If the lock has been released during the lock renewal, thread-1 will not renew the lock after processing services

8: Implementation principle of Redisson

RLock redissonLock = redisson.getLock(LOCK_KEY); Redissonlock. lock(); // Release distributed lock redissonlock. unlock();Copy the code
  • 1: Using Redisson to implement distributed lock is very simple, is a simple two lines of code, now to analyze its underlying implementation principle
  • 2: Go to the lock() method, in the RedissonLock class
@Override public void lock() { try { lock(-1, null, false); } catch (InterruptedException e) { throw new IllegalStateException(); }}Copy the code

-3: Continue into the lock() method

private void lock(long leaseTime, TimeUnit unit, Boolean interruptibly) throws InterruptedException {// obtain the id of the currentThread. Long threadId = thread.currentthread ().getid (); // Core method, this method is the lock, if the return TTL! Long TTL = tryAcquire(-1, leaseTime, unit, threadId) Long TTL = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } RFuture<RedissonLockEntry> future = subscribe(threadId); if (interruptibly) { commandExecutor.syncSubscriptionInterrupted(future); } else { commandExecutor.syncSubscription(future); } try {while (true) {TTL = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } / / waiting for the message if (TTL > = 0) {try {/ / while not kept cycle, it will get to have the thread of the lock time remaining, Then try to retrieve the lock future.getNow().getlatch ().tryacquire (TTL, timeunit.milliseconds); } catch (InterruptedException e) { if (interruptibly) { throw e; } future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { future.getNow().getLatch().acquire(); } else { future.getNow().getLatch().acquireUninterruptibly(); } } } } finally { unsubscribe(future, threadId); } // get(lockAsync(leaseTime, unit)); }Copy the code
  • 4: enter Long TTL = tryAcquire(-1, leaseTime, unit, threadId); methods
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, Long threadId) {// will not enter this if inside if (leaseTime! = -1) { return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); / / get the lock, here will be executed a callback function, the function of the effective time is to lock in the lives of ttlRemainingFuture. The onComplete ((ttlRemaining, e) - > {if e! = null) { return; } // lock acquired if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); }}); return ttlRemainingFuture; }Copy the code
  • 5: First look at the tryLockInnerAsync method
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);" , Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }Copy the code
  • 6:Redisson uses a lot of lua scripts. Let’s take a look at what this lua script means
    • Call (‘exists’, KEYS[1]) == 0) redisson.getLock(KEY);

    • 6.2: If == 0, then there is no lock, then the lock can be obtained, the VALUE of this KEY is the current thread ID

    • 6.3: Continues to set the timeout period for the lock

    • 6.4: If the KEY already exists, the value of the KEY is determined to be the same as that of the thread that is currently trying to acquire the lock. Redisson distributed locks support reentrant

    • 6.5: If none of these conditions are met, the lock duration is returned

  • 7: Redisson to lock the lives into scheduleExpirationRenewal (threadId); methods
private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getName() + " expiration", e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }
                
                if (res) {
                    // reschedule itself
                    renewExpiration();
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
    ee.setTimeout(task);
}
Copy the code
  • 7.1: The TimerTask will delay execution by taking 1/3 of the lock expiration time. For example, if the lock is valid for 30 seconds, the execution will start 10 seconds after the lock is acquired. If the key is still available and the thread is executing at 10 seconds, the TimerTask will delay execution by 1/3 of the lock expiration time. Reset the thread’s duration to 30s, and then execute the method again after 10s, which is the following method

    If (res) {// reschedule itself renewExpiration(); }Copy the code

8: What if the thread does not acquire the lock?

  • A thread that has not acquired the lock will be given the lock validity period. It will not keep trying to acquire the lock, but will try again after the lock validity period. For example: Thread-2 attempts to acquire the lock, but Thread-1 has already obtained the lock. Therefore, Thread-2 does not continue to try to acquire the lock, but will obtain the lock validity time of Thread-1. For example, the lock of Thread-1 will expire in 15 seconds. Thread-2 will wait 15 seconds before attempting to acquire the lock