Redisson profile

Redisson is a Java in-memory Data Grid based on Redis. It not only provides a set of distributed Java common objects, but also provides a number of distributed services. The purpose of Redisson is to promote the Separation of Concern for Redisso that users can focus more on business logic, with Netty implementation at the bottom.

Here is a direct copy of the official introduction, about the structure and use of Redisson more detailed introduction you can go to redisson official website or Git to learn.

Git repository address: github.com/redisson/re…

The principle of analytic

In fact, redisson realizes the delay queue by TimerTask, which will involve a lot of data write operations. Redis is guaranteed by Lua script. Let’s briefly introduce the process below.

Redisson first generates three variables in memory: queueName, channelName, timeoutSetName, The three variables are prefixed with redisson_delay_queue, redisson_queue_channel, and redisson_delay_queue_timeout, respectively.

When a client submits a task, the lua script performs the following actions:

  1. The submitted task data is converted to binary data,
  2. Puts the delay time and binary data into an ordered set (timeoutSetName) with a score of (delay time + current time)
  3. Put the binary data into the queueName queue
  4. Publish the timeout time to channelName

The lua script is as follows

local value = struct.pack('dLc0'.tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);
redis.call('zadd', KEYS[2], ARGV[1], value);
redis.call('rpush', KEYS[3], value);
// if new object added to queue head when publish its startTime 
// to all scheduler workers 
local v = redis.call('zrange', KEYS[2].0.0); 
if v[1] == value then 
   redis.call('publish', KEYS[4], ARGV[1]); 
end;
Copy the code

Then the listener on the channelName channel executes the scheduleTask method. The flow of this method is as follows:

  1. Subtract the current time from the timeout time to obtain the delayTime delayTime,
  2. If delayTime is less than 10 milliseconds, then the pushTask method is immediately executed, which eventually calls the pushTaskAsync method. The pushTaskAsync method executes as follows:

1>. Extract the first 100 elements in the range (0, current timestamp) from the ordered collection timeoutSetName

2>. If there is a value in the collection, the collection is iterated over and the element is placed in blockingQueue, while the corresponding element in queueName queue is deleted. Remove the elements from the timeoutSetName collection when the traversal is complete

  1. If delayTime is greater than 10 milliseconds, a TimerTask is generated and delayTime is delayed before pushTask is executed.

The specific code is as follows:

ScheduleTask code

private void scheduleTask(final Long startTime) {
    TimeoutTask oldTimeout = lastTimeout.get();
    if (startTime == null) {
        return;
    }
    
    if(oldTimeout ! =null) {
        oldTimeout.getTask().cancel();
    }
    
    long delay = startTime - System.currentTimeMillis();
    if (delay > 10) {
        Timeout timeout = connectionManager.newTimeout(new TimerTask() {                    
            @Override
            public void run(Timeout timeout) throws Exception {
                pushTask();
                
                TimeoutTask currentTimeout = lastTimeout.get();
                if (currentTimeout.getTask() == timeout) {
                    lastTimeout.compareAndSet(currentTimeout, null);
                }
            }
        }, delay, TimeUnit.MILLISECONDS);
        if(! lastTimeout.compareAndSet(oldTimeout,newTimeoutTask(startTime, timeout))) { timeout.cancel(); }}else{ pushTask(); }}Copy the code

The pushTaskAsync method flow is executed through Lua

protected RFuture<Long> pushTaskAsync(a) {
    return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
            "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
          + "if #expiredValues > 0 then "
              + "for i, v in ipairs(expiredValues) do "
                  + "local randomId, value = struct.unpack('dLc0', v);"
                  + "redis.call('rpush', KEYS[1], value);"
                  + "redis.call('lrem', KEYS[3], 1, v);"
              + "end; "
              + "redis.call('zrem', KEYS[2], unpack(expiredValues));"
          + "end; "
            // get startTime from scheduler queue head task
          + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
          + "if v[1] ~= nil then "
             + "return v[2]; "
          + "end "
          + "return nil;",
          Arrays.<Object>asList(getRawName(), timeoutSetName, queueName),
          System.currentTimeMillis(), 100);
}
Copy the code

More detailed process we can go to look at the source code, here is just a brief introduction to the main process.

Integrated Springboot

Here are maven dependencies to add to the POM file

<dependency>
     <groupId>org.redisson</groupId>
     <artifactId>redisson-spring-boot-starter</artifactId>
     <version>3.15. 5</version>
</dependency>
Copy the code

Add redis configuration to the project configuration center

spring.redis.host = test.database7401.scsite.net
spring.redis.port = 7401
spring.redis.password = Vbylce3g5N0lbPbd3f4y854C
Copy the code

The Springboot integration with Redisson is complete.

Code implementation

We expect the business side to pay more attention to logical development, just write the detailed execution content of the delayed task, and submit the task at the node where it is needed.

Therefore, when writing a specific delayed task, the business side only needs to implement an interface provided by us, call a method when submitting, and transmit the parameters required for the delayed task execution, the class to be executed, the delay time and the time unit to the method. Based on this idea, our specific implementation is as follows:

Use of delayed tasks

Delayed task interface

/ * * *@author lindj
 * @date 2021/5/26 3:21 下午
 * @describeDelayed task interface **/
public interface DelayJob<T> {
    /** * Delay task interface *@param deplayJobDTO
     */
    void execute(DeplayJobDTO<T> deplayJobDTO);
}
Copy the code

In scenarios that require delayed tasks, the business side only needs to implement this interface.

Delay task execution parameters

/ * * *@author lindj
 * @date 2021/5/26 3:15 下午
 * @describeDelay task parameter **/
@Data
@NoArgsConstructor
public class DeplayJobDTO<T> implements Serializable {
    /** * Parameters required for delayed task execution */
    private T param;
    /** * Delay task execution class */
    private Class clazz;
}
Copy the code

Where param is the parameter required by specific service execution, clazz is the specific implementation class of DelayJob.

Delayed task submission interface. Services do not need to pay attention to the specific implementation, but only need to invoke the interface on the service node.

@Autowired
private DelayJobProducer delayJobProducer;
this.delayJobProducer.submitDelayJob(deplayJobDTO, 30L,
        TimeUnit.SECONDS);
Copy the code

So far, that’s all the business side needs to pay attention to about the delayed queue. Let’s take a look at how the delayed task submission class and the submitted delayed task are handled in the background. I personally feel that this part can be encapsulated in components or integrated into frameworks for multiple teams to use.

Implementation of delayed task submission class

Delayed queue definition, where we hand the queue over to Spring for management

@Bean
public RDelayedQueue getDelayedQueue(a){
    RBlockingQueue blockingQueue = redissonClient.getBlockingQueue(deleyQueueName);
    return redissonClient.getDelayedQueue(blockingQueue);
}
Copy the code

Interface for submitting delayed tasks

/ * * *@author lindj
 * @date 2021/5/26 3:48 下午
 * @describeDelayed task submission interface **/
public interface DelayJobProducer {
    /** * Submit a delayed task *@param deplayJobDTO
     * @param delayTime
     * @param timeUnit
     */
    void submitDelayJob(DeplayJobDTO deplayJobDTO, Long delayTime, TimeUnit timeUnit);
}
Copy the code

Implementation of delayed task submission interface

/ * * *@author lindj
 * @date 2021/5/26 3:49 下午
 * @describeDelayed task submission class **/
@Component
public class DelayJobProducerImpl implements DelayJobProducer {
    @Autowired
    private RDelayedQueue delayedQueue;
    / * * * *@param deplayJobDTO
     * @param delayTime
     * @param timeUnit
     */
    @Override
    public void submitDelayJob(DeplayJobDTO deplayJobDTO, Long delayTime, TimeUnit timeUnit) { delayedQueue.offer(deplayJobDTO,delayTime,timeUnit); }}Copy the code

Delayed task execution

The general idea is that after the node submits the delayed task, a thread in the background will continuously take out the task from the queue for consumption. According to the class specified in the parameter of the delayed task, it will find the object from the Spring container and submit it to the thread pool to execute the Execute method. The specific code is as follows:

/ * * *@author lindj
 * @date 2021/5/26 3:33 下午
 * @describeDelay task execution class **/
@Component
@Slf4j
public class DelayJobTimmer {
    @Value("${redisson.delayJob.queueName}")
    private String deleyQueueName;
    @Autowired
    private RedissonClient client;
    @Autowired
    private ApplicationContext context;
    ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
    @PostConstruct
    public void startJobTimer(a) {
        RBlockingQueue<DeplayJobDTO> blockingQueue =
                client.getBlockingQueue(deleyQueueName);
        new Thread() {
            @Override
            public void run(a) {
                while (true) {
                    try {
                        // Get the task from the queue
                        DeplayJobDTO jobDTO = blockingQueue.take();
                        executorService.execute(new DelayJobTask(context, jobDTO));
                    } catch (Exception e) {
                        log.error("Delayed task execution failed {}", e);
                        // Todo can save databases
                    }
                }
            }
        }.start();
    }
    
    class DelayJobTask implements Runnable {
        private ApplicationContext context;
        private DeplayJobDTO deplayJobDTO;
        public DelayJobTask(ApplicationContext context, DeplayJobDTO deplayJobDTO) {
            this.context = context;
            this.deplayJobDTO = deplayJobDTO;
        }
        @Override
        public void run(a) { DelayJob delayJob = (DelayJob) context.getBean(deplayJobDTO.getClazz()); delayJob.execute(deplayJobDTO); }}}Copy the code

test

First we need to create a new class that implements the DelayJob interface, the logic part of the delayed task, and here we simply print the parameters.

/ * * *@author lindj
 * @date 2021/5/26 3:42 下午
 * @describe* * /
@Component
@Slf4j
public class TestDelayJob implements DelayJob<ParamDTO> {
    @Override
    public void execute(DeplayJobDTO<ParamDTO> deplayJobDTO) {
        ParamDTO paramDTO = deplayJobDTO.getParam();
       log.info("TestDelayJobService starting paramDTO={}", paramDTO); }}Copy the code

Here defines an interface, set the delay task parameters and specific implementation class, set the task 30 seconds after execution.

@GetMapping(value = "/api/delayJob/producer")
public String setValue(a){
    DeplayJobDTO<ParamDTO> deplayJobDTO = new DeplayJobDTO<>();
    ParamDTO paramDTO = new ParamDTO();
    paramDTO.setName("lindj");
    paramDTO.setAge(66);
    
    deplayJobDTO.setParam(paramDTO);
    
    deplayJobDTO.setClazz(TestDelayJob.class);
    this.delayJobProducer.submitDelayJob(deplayJobDTO, 30L,
            TimeUnit.SECONDS);
    log.info("Delayed task submission completed");
    return "success";
}
Copy the code

The/API /delayJob/producer interface is invoked. After 30 seconds, the delayed task is executed

Reference: www.kailing.pub/article/ind…