preface

Having covered most of Redisson’s content, let’s take a look at what components Redisson has on its website:

With Semaphore and CountDownLatch remaining, let’s see how Redisson does it while the iron is hot.

Semaphore and CountDownLatch are two brothers we know about in the JDK, but we won’t go into details here.

Semaphore example

Here’s the Semaphore schematic:

Then let’s look at the example used in Redisson:

RSemaphore semaphore = redisson.getSemaphore("semaphore");
A maximum of 3 threads are allowed to acquire the lock at the same time
semaphore.trySetPermits(3);

for(int i = 0; i < 10; i++) {
  new Thread(new Runnable() {

    @Override
    public void run(a) {
      try {
        System.out.println(new Date() + ": thread [" + Thread.currentThread().getName() + "] Attempt to acquire Semaphore lock"); 
        semaphore.acquire();
        System.out.println(new Date() + ": thread [" + Thread.currentThread().getName() + "] Semaphore lock was successfully acquired and started working."); 
        Thread.sleep(3000);  
        semaphore.release();
        System.out.println(new Date() + ": thread [" + Thread.currentThread().getName() + "] Release Semaphore lock"); 
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
  }).start();
}
Copy the code

Semaphore source code analysis

Let’s see how the source code is implemented based on the example above:

Semaphore. TrySetPermits (3);

public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
    @Override
    public boolean trySetPermits(int permits) {
        return get(trySetPermitsAsync(permits));
    }

    @Override
    public RFuture<Boolean> trySetPermitsAsync(int permits) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "local value = redis.call('get', KEYS[1]); " +
                "if (value == false or value == 0) then "
                    + "redis.call('set', KEYS[1], ARGV[1]); "
                    + "redis.call('publish', KEYS[2], ARGV[1]); "
                    + "return 1;"
                + "end;"
                + "return 0;", Arrays.<Object>asList(getName(), getChannelName()), permits); }}Copy the code

The execution process is as follows:

  1. Get semaphore: gets a current value
  2. The first time the data is 0, then use Set Semaphore 3 to set the number of clients that this semaphore can allow to acquire locks at the same time to 3
  3. Then publish some messages that return 1

Next look at semaphore.acquire(); And semaphore release (); Logic:

public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
    @Override
    public RFuture<Boolean> tryAcquireAsync(int permits) {
        if (permits < 0) {
            throw new IllegalArgumentException("Permits amount can't be negative");
        }
        if (permits == 0) {
            return RedissonPromise.newSucceededFuture(true);
        }

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                  "local value = redis.call('get', KEYS[1]); " +
                  "if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +
                      "local val = redis.call('decrby', KEYS[1], ARGV[1]); " +
                      "return 1; " +
                  "end; " +
                  "return 0;",
                  Collections.<Object>singletonList(getName()), permits);
    }

    @Override
    public RFuture<Void> releaseAsync(int permits) {
        if (permits < 0) {
            throw new IllegalArgumentException("Permits amount can't be negative");
        }
        if (permits == 0) {
            return RedissonPromise.newSucceededFuture(null);
        }

        return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
            "local value = redis.call('incrby', KEYS[1], ARGV[1]); " +
            "redis.call('publish', KEYS[2], value); ", Arrays.<Object>asList(getName(), getChannelName()), permits); }}Copy the code

TryAcquireAsync () :

  1. Get semaphore, get a current value, say 3,3 > 1
  2. Decrby semaphore 1, decrement the number of clients allowed to acquire locks by 1 to 2
  3. decrby semaphore 1
  4. decrby semaphore 1
  5. After three locks, the semaphore value is 0

At this point, if the lock is added again, it will directly return 0, and then enter an infinite loop to obtain the lock, as shown in the following figure:

Then look at the unlock logic releaseAsync() :

  1. Incrby semaphore 1: each time a client releases the lock, the semaphore value is incremented by 1, so the semaphore value is not zero

This shows how easy it is for Redisson to implement Semaphore

CountDownLatch example use

Use cases:

RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(3);
System.out.println(new Date() + ": thread [" + Thread.currentThread().getName() + "] Sets that there must be 3 threads executing countDown to enter the wait..."); 

for(int i = 0; i < 3; i++) {
  new Thread(new Runnable() {

    @Override
    public void run(a) {
      try {
        System.out.println(new Date() + ": thread [" + Thread.currentThread().getName() + "] Some operations are being performed, please be patient..."); 
        Thread.sleep(3000); 
        RCountDownLatch localLatch = redisson.getCountDownLatch("anyCountDownLatch");
        localLatch.countDown();
        System.out.println(new Date() + ": thread [" + Thread.currentThread().getName() + "] Execute countDown"); 
      } catch (Exception e) {
        e.printStackTrace(); 
      }
    }
    
  }).start();
}

latch.await();
System.out.println(new Date() + ": thread [" + Thread.currentThread().getName() + "] Received notification that three threads have all performed countDown, can proceed"); 
Copy the code

CountDownLatch source code parsing

The source code is as follows:

 public class RedissonCountDownLatch extends RedissonObject implements RCountDownLatch {

    @Override
    public RFuture<Boolean> trySetCountAsync(long count) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if redis.call('exists', KEYS[1]) == 0 then "
                    + "redis.call('set', KEYS[1], ARGV[2]); "
                    + "redis.call('publish', KEYS[2], ARGV[1]); "
                    + "return 1 "
                + "else "
                    + "return 0 "
                + "end",
                Arrays.<Object>asList(getName(), getChannelName()), newCountMessage, count);
    }

    @Override
    public RFuture<Void> countDownAsync(a) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                        "local v = redis.call('decr', KEYS[1]);" +
                        "if v <= 0 then redis.call('del', KEYS[1]) end;" +
                        "if v == 0 then redis.call('publish', KEYS[2], ARGV[1]) end;", Arrays.<Object>asList(getName(), getChannelName()), zeroCountMessage); }}Copy the code

First analysistrySetCount()Method logic:

  1. Exists anyCountDownLatch, the first time definitely does not exist
  2. set redisson_countdownlatch__channel__anyCountDownLatch 3
  3. Returns 1

Then analysislatch.await();Method, as shown below:

If the latch is still greater than 0, the latch will continue the loop. Otherwise, the latch will exit the loop

In the final analysislocalLatch.countDown();Methods:

  1. Decr anyCountDownLatch, which countDown the cocuntDownLatch one client at a time, decreases the value of the cocuntDownLatch by one
  2. await()The aspect has been parsed to determine in an infinite loop whether anyCountDownLatch stores a value of 0, and if 0 then executes its own logic

conclusion

See how easy these two components are here?

This is the end of the Redisson section, and we will also learn how ZK implements distributed locking.

statement

This article first from my public number: a flower is not romantic, if reprinted please indicate the source!

Interested partners can pay attention to personal public account: a flower is not romantic