Delays in the queue

Application scenarios

After placing an order, the order will be automatically cancelled if the payment is not made within 30 minutes; After payment, 24 hours without comments automatically high praise;

Based on Java DelayQueue DelayQueue implementation

DelayQueue is a BlockingQueue, which essentially encapsulates a PriorityQueue. When we add elements to the DelayQueue, we give them a Delay as a sort condition. The smallest element in the queue is placed first. Elements in the queue are not allowed to be fetched from the queue until the Delay time.

DelayQueue’s PUT method is thread-safe because it uses the ReentrantLock lock internally for thread synchronization. DelayQueue also provides two methods of queue exit poll() and take(). Poll () is a non-blocking fetch and null is returned if there are no expired elements. Take () blocks, and threads that have not expired elements will wait.

Based on scheduled tasks

(1) Scheduled annotations can be used to allow Java services to execute Scheduled tasks continuously while they are running, and Scheduled tasks become invalid when the Java service is not running

(2) Use the independently deployed task center to perform task scheduling.

Based on Redis Zset implementation

Redis can also implement delayed operation due to its own Zset data structure. Zset is essentially adding a sorting function to the Set structure. In addition to adding value, Zset also provides another attribute score, which can be specified when adding elements. Each time score is specified, Zset will automatically adjust the order according to the new value.

(1) If score represents the timestamp of the desired execution time, insert it into the Zset set at a certain time, and it will sort according to the timestamp size, that is, before and after the execution time.

Add ZADD delay_queue 100 to zset delay_queue 100

(2) Continuously take the first key value. If the current timestamp is greater than or equal to the socre of the key value, it will be taken out for consumption and deletion, so as to achieve the purpose of delayed execution. Note that you do not need to traverse the entire Zset collection to avoid wasting performance.

ZRANGE delay_queue 0 0 withscores Query to get the first element in the zset delay_queue and its score in ascending order.

ZREM delay_queue Xiaoming removes objects and points based on their names

Note that the query judgment and removal are guaranteed to be performed together using LUA scripts.

A thread is required to continuously access the message zset set

Redis expired callback

If the key expires, a callback event will be triggered. If the key expires, a callback event will be triggered.

Two operations are required

(1) Create RedisListenerConfig as the configuration bean and add it to the container

Class RedisKeyExpirationListener, (2) create event listeners when redis a key failure, will trigger the RedisKeyExpirationListener callback methods.

RedisKeyExpirationListener in succession, need the incoming RedisListenerConfig class

RabbitMQ implements delay queuing

RabbitMQ supports setting expiration times for messages. When messages expire, they will be detected and routed to another message queue.

Therefore, we only need two queues. The first queue is used as a delay queue for messages to expire. After these expired messages are detected, they are automatically forwarded to another message queue for normal consumption.

TTL: specifies the lifetime of a message. RabbitMQ can set the lifetime of a message on a Queue or message using the x-message-tt parameter, which is a non-negative integer in microseconds.

DLX: a dead letter queue bound to a dead letter switch. A RabbitMQ Queue can be configured with two parameters: x-dead-letter-exchange and x-dead-letter-routing-key (optional). Once a dead letter is sent to the Queue, Using these two parameters, messages can be rerouted to another Exchange (switch) so that they can be consumed again.

The time wheel implements the delay queue

Kafka time wheel (kafka time wheel) : Kafka time wheel (Kafka time wheel) : Kafka time wheel

Kafka’s time wheel

Blog.csdn.net/u013256816/… zhuanlan.zhihu.com/p/121483218

Kafka supports delayed production, delayed fetching, and delayed deletion of messages. Kafka does not use the JDK’s built-in Timer or DelayQueue. But based on the concept of time wheel custom implementation of a timer for delay function (SystemTimer). The time wheel is not unique to Kafka. It can be used in many different ways, including Netty, Akka, Quartz, and Zookeeper.

The time wheel is multi-layered, the first time wheel is the one that is actually held, and the other higher time wheels are created and held by the lower time wheel.

Each interval of the first layer time wheel is 1ms, and the size of the whole time wheel is fixed. Assuming 20, the first layer time wheel can cover 0-20s. Then, each interval of the second layer time wheel is the whole time of the first layer time wheel, namely 20s, and the second layer is also 20. The second layer covers 0-400ms, and the third layer covers 400ms at intervals.

The 0 position of the first time wheel is currentTime, and the time range covered by the whole time wheel is currentTime + the range of the top time wheel.

Specific timed tasks will be inserted into the corresponding intervals of the time wheel. Each interval will save a bidirectional linked list, and each node of the linked list is a timed task. When at the top of the time round, advancing to the current interval of time, you need to perform the corresponding time interval the list of tasks, when found that timing task time hasn’t arrived, will be relegated to the task, into the wheel at the bottom of the time, so eventually perform timing task, must be in the first time round was checked.

Refer to the link: www.jianshu.com/p/837ec4ea9…

There may be many intervals on the time wheel without scheduled tasks. If the time wheel keeps running interval after interval, in fact, there are many cases of empty propulsion. To solve this problem, kafka maintains an additional JDK delayQueue, delayQueue, which stores each interval in the time wheel. Each interval has a delay time, and the delayQueue sorts the intervals.

When creating a scheduled task, the system adds the scheduled task to the corresponding interval and adds the interval to the delayQueue.

There is a thread that continuously retrieves the interval of the head from the delay queue, and then determines whether the timed task in it expires. If it expires, it either demots some tasks to the time round, or indicates that the task has expired (such as the interval of the first time round) and executes the task directly.

Then push the pointer of the time wheel, that is, the time to push the time wheel.

Advantages of time wheel

(1) Compared with the time complexity O(logN) of the commonly used DelayQueue, the data structure of TimingWheel only needs O(1) when inserting tasks, and the time complexity of obtaining and arriving tasks is much lower than O(logN).

(2) At the same time, the design of multi-layer time wheel can also complete a good mapping for a long time scheduled task. Only 5 layers of time wheel are required, and the time span can be represented as long as 24 years (216,000 hours).