Lilishop is a Java development, based on SpringBoot research and development of B2B2C multi-user mall, front-end use Vue, UNIAPP development system full end all open-source code

This system is used to teach you how to use every detail in the system, such as: payment, third-party login, log collection, distributed transactions, seconds kill scenarios and other scenarios learning scheme


This paper studies distributed delay tasks

Introduction to Delayed Tasks

That is to specify a time, the implementation of the agreed tasks in advance, such as: scheduled to cancel orders, scheduled to unload goods, scheduled to open activities.

The difference between a delayed task and a scheduled task

The delayed task applies to personalized service scenarios, for example, an order is automatically cancelled, an activity is automatically started, and a product is automatically removed from the shelf. And then there are the more precise, real-time things.

The timed tasks are applicable to the business of the whole platform, such as the calculation of product score and unified settlement, the batch settlement of withdrawal amount in distribution, the generation of platform statistics/store statistics, etc. So basically it’s periodic scanning, every day, every hour, every minute, every month, whatever. For example, timed takedowns are fine with timed tasks, but to achieve precise task scheduling, creating a task per second is not very sensible.

The two scenarios need to be complementary, and you can consider what scenario to apply.

Thinking is introduced

  1. The project starts with a thread that is used to query redis for pending tasks at regular intervals. The task ID is the json formatted string of the object and the value is the time to execute.
  2. When a task is queried, it is deleted from the redis information. (The delayed task is executed only after the task is successfully deleted. Otherwise, the delayed task is not executed. In this way, the delayed task in the distributed system is not executed multiple times.)
  3. After the record in Redis is deleted, the child thread is enabled to execute the task. Flip the execution ID, or JSON string, back to the task to be executed, so that you can get what executor to use to execute the task and what the parameters are.
  4. Executing a delayed task

The actual use

In actual scenarios, delayed task modification and deletion will also be designed. In these scenarios, it is recommended that the redis mark the task to be executed when the task is created. If the task is deleted or modified, the identification in REDis can be modified, and of course, supplementary conditions can be determined in the business logic.

In addition, it is recommended to use MQ to implement the specific task, which means that during the task, the thread just releases an MQ and gives the consumer to consume the specific thing.

The process scan in the code is 5 seconds, which means that a delayed task can be executed at most 5 seconds. It can be adjusted to 1 second or lower in real life scenarios, but it is not recommended. In addition to redis performance bar, do not worry too much about redis connection number caused by performance problems.

Using the step

  1. Redis can be started locally or with docker-compose in ELK.

  2. Start the SpringBoot application.

  3. Request the SpringBoot application

  4. View the console output

Introduction to Key Classes

The cache operation class is used for the karyotype logic of delayed tasks. Interval query is required to perform delayed tasks. It is the Sorted Set attribute of Redis that is used to try sorting and execute tasks.
/** * add member ** to Zset@param* the key key values@paramScore, usually used to rank *@paramThe value value *@returnAdd state */
public boolean zAdd(String key, long score, String value) {
    Boolean result = redisTemplate.opsForZSet().add(key, value, score);
    return result;


/** * Get the queue ** under a key@paramKey Cache key *@paramFrom Start time *@paramTo End time *@returnData * /
public Set<ZSetOperations.TypedTuple<Object>> zRangeByScore(String key, int from, long to) {
    Set<ZSetOperations.TypedTuple<Object>> set = redisTemplate.opsForZSet().rangeByScoreWithScores(key, from, to);
    return set;

/** * Removes the Zset queue value **@param* the key key values@paramValue The deleted collection *@returnNumber of deletions */
public Long zRemove(String key, String... value) {
    return redisTemplate.opsForZSet().remove(key, value);
Delay queue abstract class, the specific delay queue to inherit
package cn.lili.trigger.plugin.delay;

import cn.hutool.json.JSONUtil;
import cn.lili.trigger.plugin.cache.Cache;
import cn.lili.trigger.plugin.util.ThreadPoolUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;

import javax.annotation.PostConstruct;
import java.util.Calendar;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/** * delay queue factory **@author paulG
 * @since2020/11/7 * * /
public abstract class AbstractDelayQueueMachineFactory {

    private Cache cache;

    /** * Insert task ID **@paramJobId Task ID (unique in queue) *@paramTime Delay time (unit: second) *@returnWhether the file is successfully inserted */
    public boolean addJob(String jobId, Integer time) {
        Calendar instance = Calendar.getInstance();
        instance.add(Calendar.SECOND, time);
        long delaySeconds = instance.getTimeInMillis() / 1000;
        boolean result = cache.zAdd(setDelayQueueName(), delaySeconds, jobId);"Add delay task, cache key {}, wait time {}", setDelayQueueName(), time);
        return result;


    /** * delay queue machine started to run */
    private void startDelayQueueMachine(a) {"Delay queue machine {} operational", setDelayQueueName());

        // Listen to the Redis queue
        while (true) {
            try {
                // Get the timestamp of the current time
                long now = System.currentTimeMillis() / 1000;
                // Get the task list before the current time
                Set<DefaultTypedTuple> tuples = cache.zRangeByScore(setDelayQueueName(), 0, now);

                // If the task is not empty
                if(! CollectionUtils.isEmpty(tuples)) {"Delayed task starts task execution :{}", JSONUtil.toJsonStr(tuples));

                    for (DefaultTypedTuple tuple : tuples) {
                        String jobId = (String) tuple.getValue();
                        // Remove the cache. If the cache is removed successfully, it indicates that the current thread has processed the delayed task, and the delayed task is executed
                        Long num = cache.zRemove(setDelayQueueName(), jobId);
                        // If the command is successfully removed, the command is executed
                        if (num > 0) { ThreadPoolUtil.execute(() -> invoke(jobId)); }}}}catch (Exception e) {
                log.error("An exception occurs during the processing of delayed tasks. The cause is {}", e.getMessage(), e);
            } finally {
                // Every 5 seconds
                try {
                } catch(InterruptedException e) { e.printStackTrace(); }}}}/** * The final task method executed **@paramJobId Indicates the task ID */
    public abstract void invoke(String jobId);

    /** * The name of the queue to implement the delay */
    public abstract String setDelayQueueName(a);

    public void init(a) {
Example implementation of delay queue
package cn.lili.trigger.plugin.delay;

import cn.hutool.json.JSONUtil;
import cn.lili.trigger.plugin.interfaces.TimeTrigger;
import cn.lili.trigger.plugin.interfaces.TimeTriggerExecutor;
import cn.lili.trigger.plugin.model.TimeTriggerMsg;
import cn.lili.trigger.plugin.util.SpringContextUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/** * Test delay queue **@author paulG
 * @versionV4.1 *@date 2020/11/17 7:19 下午
 * @description
 * @since1 * /
public class TestDelayQueue extends AbstractDelayQueueMachineFactory {

    private TimeTrigger timeTrigger;

    public void invoke(String jobId) {
        TimeTriggerMsg timeTriggerMsg = JSONUtil.toBean(jobId, TimeTriggerMsg.class);

        TimeTriggerExecutor executor = (TimeTriggerExecutor) SpringContextUtil.getBean(timeTriggerMsg.getTriggerExecutor());


    public String setDelayQueueName(a) {
Delayed task interface
package cn.lili.trigger.plugin.interfaces;

import cn.lili.trigger.plugin.model.TimeTriggerMsg;

/** * delay execution interface **@author Chopper
public interface TimeTrigger {

    /** * Add delay task **@paramTimeTriggerMsg Delay task information */
    void add(TimeTriggerMsg timeTriggerMsg);

Redis delay task implementation class
package cn.lili.trigger.plugin.interfaces.impl;

import cn.hutool.json.JSONUtil;
import cn.lili.trigger.plugin.delay.TestDelayQueue;
import cn.lili.trigger.plugin.interfaces.TimeTrigger;
import cn.lili.trigger.plugin.model.TimeTriggerMsg;
import cn.lili.trigger.plugin.util.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/** * Redis delay task **@author Chopper
 * @versionV1.0 * 2021-06-09 11:00 */
public class RedisTimerTrigger implements TimeTrigger {

    private TestDelayQueue testDelayQueue;

    public void add(TimeTriggerMsg timeTriggerMsg) {
        // Calculate delay time Execution time - current time
        Integer delaySecond = Math.toIntExact(timeTriggerMsg.getTriggerTime() - DateUtil.getDateline());
        // Set the delay task
        if (Boolean.TRUE.equals(testDelayQueue.addJob(JSONUtil.toJsonStr(timeTriggerMsg), delaySecond))) {
  "Timed execution at [" + DateUtil.toString(timeTriggerMsg.getTriggerTime(), "yyyy-MM-dd HH:mm:ss") + "], consumption [" + timeTriggerMsg.getParam().toString() + "】");
        } else {
Delayed task executor interface
package cn.lili.trigger.plugin.interfaces;

/** * Delay task executor interface **@author Chopper
public interface TimeTriggerExecutor {

    /** * Execute the task **@paramObject Task parameter */
    void execute(Object object);

Delayed task implementation
package cn.lili.trigger.plugin.interfaces.execute;

import cn.lili.trigger.plugin.interfaces.TimeTriggerExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/** * test executor **@author Chopper
 * @versionV1.0 * 2021-06-09 10:49 */
public class TestTimeTriggerExecutor implements TimeTriggerExecutor {

5 – phase elimination model for delayed tasks
package cn.lili.trigger.plugin.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;


/** * Delayed task message **@author Chopper
 * @versionV1.0 *@since2019-02-12 5:46 PM */
public class TimeTriggerMsg implements Serializable {

    private static final long serialVersionUID = 8897917127201859535L;

    /** * Executor execution time */
    private Long triggerTime;
    /** * beanId */
    private String triggerExecutor;

    /** * The actuator argument */
    private Object param;

The controller
package cn.lili.trigger.controller;

import cn.lili.trigger.plugin.interfaces.TimeTrigger;
import cn.lili.trigger.plugin.model.TimeTriggerMsg;
import cn.lili.trigger.plugin.util.DateUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

public class TestController {

    private TimeTrigger timeTrigger;

    public void test(Integer seconds) {
        Long executeTime = DateUtil.getDateline() + 5;
        if(seconds ! =null) {
            executeTime = DateUtil.getDateline() + seconds;
