In this paper, the demo has been uploaded making address: https://github.com/chengxy-nds/delayqueue, search, programmers in something 】 【 WX reply [666].

During the May Day period, the original plan is to write two articles and read a technical book. As a result, these five days because of too poor self-discipline, I could not help all kinds of temptation. I did not even open the computer, and the plan was a perfect failure. So in this can see the gap between the big guy, the other people did not white night more text, better than you than you work harder, hard to match, really let me feel ashamed.

Shame-awareness then yong, this does not force yourself to learn again, people prefer something practice class, can learn knowledge and technology to the ground, can come up with a demo best, didn’t know what to share the theme, but the recent project emergency call, and I had a chance to do the back to the interviewer, will give you arrange to share a interview questions: “how to realize the delay queue?” .

The following will introduce a variety of implementation of delay queue ideas, the end of the article provides several implementation of the Github address. In fact, there is no absolute good or bad, just to see what it is used in the business scenario, technology this thing is not the best only the most appropriate.

1. Application of delay queue

What is a delayed queue? As the name suggests, it should first have the characteristics of a queue, and then add the ability to delay the consumption of queue messages, which means you can specify at what point in time the messages in the queue will be consumed.

There are many applications of delay queue in projects, especially in e-commerce platforms:

1. If the payment is not made within 30 minutes after the successful order, the order will be automatically cancelled

2. The takeaway platform will send the order notification, and push SMS to the user 60s after the successful order.

3. If the order has been in a certain unfinished state, the customs order shall be processed in time and the inventory shall be returned

4. If new merchants on Taobao do not upload commodity information within one month, they will be frozen

.

Each of the above scenarios can be resolved by applying a delay queue.

Second, the implementation of delay queue

I have always taken the view that when you work with functionality that can be implemented with the JDK’s own API, you should not easily reinvent the wheel or introduce third party middleware. On the one hand, self-encapsulation is easy to problem (except for the big guy), plus debugging validation produces a lot of unnecessary work; On the other hand, once the middleware of the three parties is connected, the system complexity will increase exponentially, and the maintenance cost will also increase greatly.

1. DelayQueue indicates the DelayQueue

The JDK provides a set of apis for implementing deferred queues, DelayQueue, under the java.util.concurrent package.

DelayQueue is a BlockingQueue, which essentially encapsulates a PriorityQueue. The PriorityQueue uses a full binary heap to sort the elements of the queue. When we add elements to a DelayQueue, we give them a Delay, and the smallest element in the queue is placed first. Elements in the queue are not allowed to be taken out of the queue until the Delay time. In the queue, we can put basic data types or custom entity classes. When storing basic data types, the elements in the queue are arranged in ascending order by default, and the custom entity classes need to be calculated according to the value of the class attribute.

Add three orders to DelayQueue and cancel the order after 5 seconds, 10 seconds and 15 seconds respectively.

To implement a DelayQueue, the element in the queue implements the Delayed interface, which has a single getDelay method that sets the delay time. The compareTo method in the Order class is responsible for sorting the elements in the queue.

public class Order implements Delayed {
    /** * Delay time */
    @JsonFormat(locale = "zh", timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
    private long time;
    String name;
    
    public Order(String name, long time, TimeUnit unit) {
        this.name = name;
        this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
    }
    
    @Override
    public long getDelay(TimeUnit unit) {
        return time - System.currentTimeMillis();
    }
    @Override
    public int compareTo(Delayed o) {
        Order Order = (Order) o;
        long diff = this.time - Order.time;
        if (diff <= 0) {
            return -1;
        } else {
            return 1; }}}Copy the code

The PUT method of DelayQueue is thread-safe because the PUT method internally uses a ReentrantLock lock for thread synchronization. DelayQueue also provides two dequeued methods, poll() and take(). Poll () is a non-blocking fetch that returns null for elements that have not expired. Take () is a blocking fetch, and the thread will wait for elements that are not due.

public class DelayQueueDemo {

    public static void main(String[] args) throws InterruptedException {
        Order Order1 = new Order("Order1".5, TimeUnit.SECONDS);
        Order Order2 = new Order("Order2".10, TimeUnit.SECONDS);
        Order Order3 = new Order("Order3".15, TimeUnit.SECONDS);
        DelayQueue<Order> delayQueue = new DelayQueue<>();
        delayQueue.put(Order1);
        delayQueue.put(Order2);
        delayQueue.put(Order3);

        System.out.println("Order delay queue start time :" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        while(delayQueue.size() ! =0) {
            /** * Specifies whether the queue header element is expired */
            Order task = delayQueue.poll();
            if(task ! =null) {
                System.out.format("Order :{%s} cancelled at {%s}\n", task.name, LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
            }
            Thread.sleep(1000); }}}Copy the code

Above is a simple implementation of the operation of the queue and out of the queue, the actual development will have a dedicated thread, responsible for the message of the queue and consumption.

Order1, Order2, Order3 are executed after 5 seconds, 10 seconds, 15 seconds respectively.

Order delay queue start time:2020-05-06 14:59:09Order :{Order1}2020-05-06 14:59:14} order :{Order2}2020-05-06 14:59:19} order :{Order3}2020-05-06 14:59:24}
Copy the code
2. Quartz Scheduled task

Quartz is a classic task scheduling framework. In the days before Redis and RabbitMQ were widely used, timed tasks were used to cancel orders. Timed task it has a certain periodicality, many orders may have timed out, but have not reached the trigger execution time point, then will cause the order processing is not timely enough.

Introduce the Quartz framework dependencies

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
Copy the code

Use the @enablescheduling annotation in the startup class to enable scheduled tasks.

@EnableScheduling
@SpringBootApplication
public class DelayqueueApplication {
	public static void main(String[] args) { SpringApplication.run(DelayqueueApplication.class, args); }}Copy the code

Write a scheduled task to execute every 5 seconds.

@Component
public class QuartzDemo {

    // Every five seconds
    @Scheduled(cron = "0/5 * * * *? ")
    public void process(a){
        System.out.println("I'm on a scheduled mission!"); }}Copy the code
3, Redis sorted set

The data structure Zset of Redis can also achieve the effect of delay queue. Mainly using its score attribute, Redis sorts the members in the set from small to large by score.

zadd
delayqueue
score
delayqueue
order1
order2
order3
10 seconds
20 seconds
30 seconds

 zadd delayqueue 3 order3
Copy the code

The consumer side polls the delayqueue and compares the minimum time of the sorted elements with the current time. If the time is less than the current time, the key has expired and is removed.

    /** * Consume message */
    public void pollOrderQueue(a) {

        while (true) {
            Set<Tuple> set = jedis.zrangeWithScores(DELAY_QUEUE, 0.0);

            String value = ((Tuple) set.toArray()[0]).getElement();
            int score = (int) ((Tuple) set.toArray()[0]).getScore();
            
            Calendar cal = Calendar.getInstance();
            int nowSecond = (int) (cal.getTimeInMillis() / 1000);
            if (nowSecond >= score) {
                jedis.zrem(DELAY_QUEUE, value);
                System.out.println(sdf.format(new Date()) + " removed key:" + value);
            }

            if (jedis.zcard(DELAY_QUEUE) <= 0) {
                System.out.println(sdf.format(new Date()) + " zset empty ");
                return;
            }
            Thread.sleep(1000); }}Copy the code

We see the results of the execution as expected

2020-05-07 13:24:09 add finished.
2020-05-07 13:24:19 removed key:order1
2020-05-07 13:24:29 removed key:order2
2020-05-07 13:24:39 removed key:order3
2020-05-07 13:24:39 zset empty 
Copy the code
4. Redis expiration callback

If a key expires, a callback event will be triggered. If a key expires, a callback event will be triggered.

Conf file to enable notify-keyspace-events Ex

notify-keyspace-events Ex
Copy the code

Redis configuration monitoring, inject beans RedisMessageListenerContainer

@Configuration
public class RedisListenerConfig {
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        returncontainer; }}Copy the code

Write Redis expired callback to monitor method, must inherit KeyExpirationEventMessageListener, somewhat akin to MQ message to monitor.

@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
 
    public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }
    @Override
    public void onMessage(Message message, byte[] pattern) {
        String expiredKey = message.toString();
        System.out.println("Listen to key:" + expiredKey + "Expired"); }}Copy the code

To test the effect, add a key to the Redis – CLI client and give it a 3s expiration time.

 set xiaofu 123 ex 3
Copy the code

The expired key was successfully listened on the console.

The expired key is xiaofuCopy the code
5. RabbitMQ delay queue

It is common to use RabbitMQ as a delay queue, but RabbitMQ itself does not support delay queues directly, but indirectly through the TTL and DXL properties of the RabbitMQ message queue.

Let’s take a look at TTL and DXL:

Time To Live (TTL) :

TTL is what the name suggests: the lifetime of messages. RabbitMQ can specify the lifetime of messages on Queue and message by using the x-message-tt parameter. The value is a non-negative integer in microseconds.

RabbitMQ can set the message expiration in two dimensions: the queue and the message itself

  • Set the expiration time of the queue so that all messages in the queue have the same expiration time.
  • Set the expiration time of a message. Set the expiration time of a message in the queueTTLThey can all be different.

If you set the TTL for both the queue and the message in the queue, the TTL value is the smaller of the two. A message in a queue is a Dead Letter when the TTL expires.

Dead Letter Exchanges (DLX)

A DLX is a dead letter switch. A dead letter queue is bound to a dead letter switch. The RabbitMQ Queue can be configured with two parameters, x-dead-letter-exchange and x-dead-letter-routing-key (optional). If a dead letter is present in the Queue, These two parameters allow the message to be rerouted to another Exchange, allowing the message to be consumed again.

X-dead-letter-exchange: Indicates that after a Dead letter appears in the queue, the system rerouts the Dead letter to the specified exchange (switch).

X-dead-letter-routing-key: indicates the routing key. Generally, it is the queue to be forwarded.

Dead Letter appears in the queue as follows:

  • Of a message or queueTTLoverdue
  • The queue reached the maximum length
  • The message is rejected by the consumer (basic.reject or basic.nack)

We send the order message A0001 to the delay queue order.delay. Queue, and set the x-message-TT message survival time to 30 minutes. After 30 minutes, the order message A0001 becomes Dead Letter. The delay queue detects Dead letters. Configure x-dead-letter-Exchange to reforward the Dead letters to the queue that can normally consume the orders.

Specifies the time to delay a message when it is sent

public void send(String delayTimes) {
        amqpTemplate.convertAndSend("order.pay.exchange"."order.pay.queue"."Hi, this is delayed data.", message -> {
            // Set the delay in milliseconds
            message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
            returnmessage; }); }}Copy the code

Set the forwarding rules for dead letters in the delay queue

/** * Delay queue */
    @Bean(name = "order.delay.queue")
    public Queue getMessageQueue(a) {
        return QueueBuilder
                .durable(RabbitConstant.DEAD_LETTER_QUEUE)
                // Configure the forwarding exchange after expiration
                .withArgument("x-dead-letter-exchange"."order.close.exchange")
                // Configure the routing key for forwarding after expiration
                .withArgument("x-dead-letter-routing-key"."order.close.queue")
                .build();
    }
Copy the code
6. Time wheel

The previous implementation of several delay queue is relatively simple, relatively easy to understand, the time round algorithm is a little bit abstract. Kafka, Netty are based on the time round algorithm to achieve delay queue, the following main practice netTY delay queue talk about what is the principle of time round.

Let’s take a look at a schematic of the time wheel and explain some of the basic concepts of the time wheel

wheel
round
24 seconds
8
3 seconds
3 seconds

If A timed and delayed task A is added, if it will be executed after 25 seconds, but the length of one round of the time wheel is only 24 seconds, then A round number and the corresponding pointer position index will be obtained according to the length of the time wheel and the scale. In this case, task A will point to the grid 0 around one circle. In this case, the time round records the round and index information of the task. If round=0, index=0, and the pointer is pointing to 0, task A will not execute because round=0 does not satisfy the requirement.

So each cell represents a number of times, such as 1 second and 25 seconds that point to the zero cell, and tasks are placed in a linked list for each cell, similar to the data in HashMap.

Netty mainly uses HashedWheelTimer to build the delay queue. The underlying data structure of HashedWheelTimer still uses DelayedQueue, but only adopts the time wheel algorithm to realize it.

The HashedWheelTimer constructor is more than one, explaining the meaning of each parameter.

  • ThreadFactory: indicates that the thread pool is used to generate worker threads.
  • tickDurationandunit: Time interval of each grid, default is 100ms;
  • ticksPerWheelThe default value is 512. If the value passed in is not 2 to the N, it will be adjusted to a value greater than or equal to the 2 to the N of the parameter, which is beneficial for optimizationhashValue calculation.
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) {
        this(threadFactory, tickDuration, unit, ticksPerWheel, true);
    }
Copy the code
  • TimerTask: an implementation interface for a scheduled task, where the run method wraps the logic of the scheduled task.
  • Timeout: A scheduled task is submitted toTimerAfter the return handle, through this handle can cancel the scheduled task, and the state of the scheduled task to make some basic judgment.
  • TimerIs:HashedWheelTimerThe parent interface of the implementation defines only how to submit the scheduled task and how to stop the entire timing mechanism.
public class NettyDelayQueue {

    public static void main(String[] args) {

        final Timer timer = new HashedWheelTimer(Executors.defaultThreadFactory(), 5, TimeUnit.SECONDS, 2);

        // Scheduled task
        TimerTask task1 = new TimerTask() {
            public void run(Timeout timeout) throws Exception {
                System.out.println("order1  5s 后执行 ");
                timer.newTimeout(this.5, TimeUnit.SECONDS);// Register again at the end}}; timer.newTimeout(task1,5, TimeUnit.SECONDS);
        TimerTask task2 = new TimerTask() {
            public void run(Timeout timeout) throws Exception {
                System.out.println("Order2 executes after 10s");
                timer.newTimeout(this.10, TimeUnit.SECONDS);// Register at the end}}; timer.newTimeout(task2,10, TimeUnit.SECONDS);

        // Delay the task
        timer.newTimeout(new TimerTask() {
            public void run(Timeout timeout) throws Exception {
                System.out.println("Order3 execute once after 15s"); }},15, TimeUnit.SECONDS); }}Copy the code

Order3 and Order3 delayed tasks were executed only once, while order2 and Order1 were scheduled tasks, which were executed repeatedly according to different cycles.

order1  5S and then execute order210S and then execute order315Order1 is executed once after s5S and then execute order210S executed afterCopy the code

conclusion

In order to make you easier to understand, written in the above code is simple and crude, demo has several methods of implementation are submitted to making address: https://github.com/chengxy-nds/delayqueue, interested friends can download run a run.

This article has been written for quite a long time, and writing is not as easy as working at work. I have checked the data repeatedly to verify the feasibility of the demo, and set up various RabbitMQ and Redis environments. I just want to say that it is too difficult for me!

May write there is not enough perfect place, such as where there is a mistake or unknown, welcome everyone to actively correct!!

The last

Original is not easy, code word is not easy, to point a like it ~

Small welfare

Pay attention to my public number, reply [666], hundreds of all kinds of technical e-books to send, “SHH ~”, “free” to everyone, no set to get their own