Lilishop technology stack

Open source is not easy, if help please click Star

introduce

Liverpoolfc.tv: pickmall. Cn

Lilishop is a Java development, based on SpringBoot research and development of B2B2C multi-user mall, front-end use Vue, UNIAPP development system full end all open-source code

This system is used to teach you how to use every detail in the system, such as: payment, third-party login, log collection, distributed transactions, seconds kill scenarios and other scenarios learning scheme

Git addressGitee.com/beijing_hon…

This paper studies distributed delay tasks

Introduction to Delayed Tasks

That is to specify a time, the implementation of the agreed tasks in advance, such as: scheduled to cancel orders, scheduled to unload goods, scheduled to open activities.

The difference between a delayed task and a scheduled task

The delayed task applies to personalized service scenarios, for example, an order is automatically cancelled, an activity is automatically started, and a product is automatically removed from the shelf. And then there are the more precise, real-time things.

The timed tasks are applicable to the business of the whole platform, such as the calculation of product score and unified settlement, the batch settlement of withdrawal amount in distribution, the generation of platform statistics/store statistics, etc. So basically it’s periodic scanning, every day, every hour, every minute, every month, whatever. For example, timed takedowns are fine with timed tasks, but to achieve precise task scheduling, creating a task per second is not very sensible.

The two scenarios need to be complementary, and you can consider what scenario to apply.

Thinking is introduced

  1. The project starts with a thread that is used to query redis for pending tasks at regular intervals. The task ID is the json formatted string of the object and the value is the time to execute.
  2. When a task is queried, it is deleted from the redis information. (The delayed task is executed only after the task is successfully deleted. Otherwise, the delayed task is not executed. In this way, the delayed task in the distributed system is not executed multiple times.)
  3. After the record in Redis is deleted, the child thread is enabled to execute the task. Flip the execution ID, or JSON string, back to the task to be executed, so that you can get what executor to use to execute the task and what the parameters are.
  4. Executing a delayed task

The actual use

In actual scenarios, delayed task modification and deletion will also be designed. In these scenarios, it is recommended that the redis mark the task to be executed when the task is created. If the task is deleted or modified, the identification in REDis can be modified, and of course, supplementary conditions can be determined in the business logic.

In addition, it is recommended to use MQ to implement the specific task, which means that during the task, the thread just releases an MQ and gives the consumer to consume the specific thing.

The process scan in the code is 5 seconds, which means that a delayed task can be executed at most 5 seconds. It can be adjusted to 1 second or lower in real life scenarios, but it is not recommended. In addition to redis performance bar, do not worry too much about redis connection number caused by performance problems.

Using the step

  1. Redis can be started locally or with docker-compose in ELK.

  2. Start the SpringBoot application.

  3. Request the SpringBoot application http://127.0.0.1:8080

  4. View the console output

    The 2021-06-09 12:41:33. 40730-168 the INFO [nio – 8888 – exec – 1] L.T.P.D.A bstractDelayQueueMachineFactory: Test_delay (test_delay) Waiting time for 10 2021-06-09 12:41:33. 168 INFO – 40730 [nio – 8888 – exec – 1] C.L.T.P.I.I MPL. RedisTimerTrigger: Timing execution at [2021-06-09 12:41:43], Spending the 2021-06-09 test params 】 【 12:41:44. 40730-399 the INFO/Thread – 5 L.T.P.D.A bstractDelayQueueMachineFactory: Delayed tasks start tasks :[{“score”:1.623213703E9,”value”:”{“triggerTime”:1623213703,”triggerExecutor”:”testTimeTriggerExecutor”,”param “:” test params “}} “] 12:41:44 2021-06-09. 40730-403 the INFO [2 – thread pool – – 2] C.L.T.P.I.E.T estTimeTriggerExecutor: The executor performs the task Test Params

Introduction to Key Classes

The cache operation class is used for the karyotype logic of delayed tasks. Interval query is required to perform delayed tasks. It is the Sorted Set attribute of Redis that is used to try sorting and execute tasks.
/** * add member ** to Zset@param* the key key values@paramScore, usually used to rank *@paramThe value value *@returnAdd state */
@Override
public boolean zAdd(String key, long score, String value) {
    Boolean result = redisTemplate.opsForZSet().add(key, value, score);
    return result;

}


/** * Get the queue ** under a key@paramKey Cache key *@paramFrom Start time *@paramTo End time *@returnData * /
@Override
public Set<ZSetOperations.TypedTuple<Object>> zRangeByScore(String key, int from, long to) {
    Set<ZSetOperations.TypedTuple<Object>> set = redisTemplate.opsForZSet().rangeByScoreWithScores(key, from, to);
    return set;
}

/** * Removes the Zset queue value **@param* the key key values@paramValue The deleted collection *@returnNumber of deletions */
@Override
public Long zRemove(String key, String... value) {
    return redisTemplate.opsForZSet().remove(key, value);
}
Copy the code
Delay queue abstract class, the specific delay queue to inherit
package cn.lili.trigger.plugin.delay;

import cn.hutool.json.JSONUtil;
import cn.lili.trigger.plugin.cache.Cache;
import cn.lili.trigger.plugin.util.ThreadPoolUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.DefaultTypedTuple;
import org.springframework.util.CollectionUtils;

import javax.annotation.PostConstruct;
import java.util.Calendar;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/** * delay queue factory **@author paulG
 * @since2020/11/7 * * /
@Slf4j
public abstract class AbstractDelayQueueMachineFactory {

    @Autowired
    private Cache cache;

    /** * Insert task ID **@paramJobId Task ID (unique in queue) *@paramTime Delay time (unit: second) *@returnWhether the file is successfully inserted */
    public boolean addJob(String jobId, Integer time) {
        Calendar instance = Calendar.getInstance();
        instance.add(Calendar.SECOND, time);
        long delaySeconds = instance.getTimeInMillis() / 1000;
        boolean result = cache.zAdd(setDelayQueueName(), delaySeconds, jobId);
        log.info("Add delay task, cache key {}, wait time {}", setDelayQueueName(), time);
        return result;

    }

    /** * delay queue machine started to run */
    private void startDelayQueueMachine(a) {
        log.info("Delay queue machine {} operational", setDelayQueueName());

        // Listen to the Redis queue
        while (true) {
            try {
                // Get the timestamp of the current time
                long now = System.currentTimeMillis() / 1000;
                // Get the task list before the current time
                Set<DefaultTypedTuple> tuples = cache.zRangeByScore(setDelayQueueName(), 0, now);

                // If the task is not empty
                if(! CollectionUtils.isEmpty(tuples)) { log.info("Delayed task starts task execution :{}", JSONUtil.toJsonStr(tuples));

                    for (DefaultTypedTuple tuple : tuples) {
                        String jobId = (String) tuple.getValue();
                        // Remove the cache. If the cache is removed successfully, it indicates that the current thread has processed the delayed task, and the delayed task is executed
                        Long num = cache.zRemove(setDelayQueueName(), jobId);
                        // If the command is successfully removed, the command is executed
                        if (num > 0) { ThreadPoolUtil.execute(() -> invoke(jobId)); }}}}catch (Exception e) {
                log.error("An exception occurs during the processing of delayed tasks. The cause is {}", e.getMessage(), e);
            } finally {
                // Every 5 seconds
                try {
                    TimeUnit.SECONDS.sleep(5L);
                } catch(InterruptedException e) { e.printStackTrace(); }}}}/** * The final task method executed **@paramJobId Indicates the task ID */
    public abstract void invoke(String jobId);


    /** * The name of the queue to implement the delay */
    public abstract String setDelayQueueName(a);


    @PostConstruct
    public void init(a) {
        new Thread(this::startDelayQueueMachine).start(); }}Copy the code
Example implementation of delay queue
package cn.lili.trigger.plugin.delay;

import cn.hutool.json.JSONUtil;
import cn.lili.trigger.plugin.interfaces.TimeTrigger;
import cn.lili.trigger.plugin.interfaces.TimeTriggerExecutor;
import cn.lili.trigger.plugin.model.TimeTriggerMsg;
import cn.lili.trigger.plugin.util.SpringContextUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/** * Test delay queue **@author paulG
 * @versionV4.1 *@date 2020/11/17 7:19 下午
 * @description
 * @since1 * /
@Component
public class TestDelayQueue extends AbstractDelayQueueMachineFactory {

    @Autowired
    private TimeTrigger timeTrigger;

    @Override
    public void invoke(String jobId) {
        TimeTriggerMsg timeTriggerMsg = JSONUtil.toBean(jobId, TimeTriggerMsg.class);

        TimeTriggerExecutor executor = (TimeTriggerExecutor) SpringContextUtil.getBean(timeTriggerMsg.getTriggerExecutor());
        executor.execute(timeTriggerMsg.getParam());

    }

    @Override
    public String setDelayQueueName(a) {
        return "test_delay"; }}Copy the code
Delayed task interface
package cn.lili.trigger.plugin.interfaces;


import cn.lili.trigger.plugin.model.TimeTriggerMsg;

/** * delay execution interface **@author Chopper
 */
public interface TimeTrigger {


    /** * Add delay task **@paramTimeTriggerMsg Delay task information */
    void add(TimeTriggerMsg timeTriggerMsg);

}
Copy the code
Redis delay task implementation class
package cn.lili.trigger.plugin.interfaces.impl;

import cn.hutool.json.JSONUtil;
import cn.lili.trigger.plugin.delay.TestDelayQueue;
import cn.lili.trigger.plugin.interfaces.TimeTrigger;
import cn.lili.trigger.plugin.model.TimeTriggerMsg;
import cn.lili.trigger.plugin.util.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/** * Redis delay task **@author Chopper
 * @versionV1.0 * 2021-06-09 11:00 */
@Component
@Slf4j
public class RedisTimerTrigger implements TimeTrigger {

    @Autowired
    private TestDelayQueue testDelayQueue;

    @Override
    public void add(TimeTriggerMsg timeTriggerMsg) {
        // Calculate delay time Execution time - current time
        Integer delaySecond = Math.toIntExact(timeTriggerMsg.getTriggerTime() - DateUtil.getDateline());
        // Set the delay task
        if (Boolean.TRUE.equals(testDelayQueue.addJob(JSONUtil.toJsonStr(timeTriggerMsg), delaySecond))) {
            log.info("Timed execution at [" + DateUtil.toString(timeTriggerMsg.getTriggerTime(), "yyyy-MM-dd HH:mm:ss") + "], consumption [" + timeTriggerMsg.getParam().toString() + "】");
        } else {
            log.error("Failed to add delayed task :{}", timeTriggerMsg); }}}Copy the code
Delayed task executor interface
package cn.lili.trigger.plugin.interfaces;

/** * Delay task executor interface **@author Chopper
 */
public interface TimeTriggerExecutor {


    /** * Execute the task **@paramObject Task parameter */
    void execute(Object object);

}
Copy the code
Delayed task implementation
package cn.lili.trigger.plugin.interfaces.execute;

import cn.lili.trigger.plugin.interfaces.TimeTriggerExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/** * test executor **@author Chopper
 * @versionV1.0 * 2021-06-09 10:49 */
@Component
@Slf4j
public class TestTimeTriggerExecutor implements TimeTriggerExecutor {

    @Override
    public void execute(Object object) {
        log.info("Actuators perform tasks {}", object); }}Copy the code
5 – phase elimination model for delayed tasks
package cn.lili.trigger.plugin.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

/** * Delayed task message **@author Chopper
 * @versionV1.0 *@since2019-02-12 5:46 PM */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class TimeTriggerMsg implements Serializable {


    private static final long serialVersionUID = 8897917127201859535L;

    /** * Executor execution time */
    private Long triggerTime;
    /** * beanId */
    private String triggerExecutor;


    /** * The actuator argument */
    private Object param;


}
Copy the code
The controller
package cn.lili.trigger.controller;

import cn.lili.trigger.plugin.interfaces.TimeTrigger;
import cn.lili.trigger.plugin.model.TimeTriggerMsg;
import cn.lili.trigger.plugin.util.DateUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;


@RestController
public class TestController {

    @Autowired
    private TimeTrigger timeTrigger;

    @GetMapping
    public void test(Integer seconds) {
        Long executeTime = DateUtil.getDateline() + 5;
        if(seconds ! =null) {
            executeTime = DateUtil.getDateline() + seconds;
        }
        TimeTriggerMsg timeTriggerMsg = new TimeTriggerMsg(executeTime, "testTimeTriggerExecutor"."test params"); timeTrigger.add(timeTriggerMsg); }}Copy the code