Original is not easy, reprint please indicate the source

preface

Delayed message (timed message) In distributed asynchronous message scenarios, the production end sends a message and hopes to be consumed by the consumer end at a specified delay or point in time rather than immediately.

Delayed messaging is applicable to a wide range of business scenarios. In distributed systems, the functionality of delayed messaging typically sinks into the middleware layer, usually MQ, or is aggregated into a common infrastructure service.

This paper discusses the implementation scheme of common delay message and the advantages and disadvantages of scheme design.

Implementation scheme

1. Solution based on external storage

By external storage, I mean storage systems that are introduced in addition to MQ’s own storage.

External storage-based solutions are essentially a set of routines that distinguish MQ from the delayed message module, which is a separate service/process. Delayed messages are kept in other storage media and then delivered to MQ when the message expires. Of course, there are also some details of the design, such as the logic of sending messages directly when the delay message module has expired, which is not discussed here.

The following solutions use different storage systems.

Based on databases (such as MySQL)

Based on the relational database (such as MySQL) delay message table to achieve.

CREATE TABLE `delay_msg` (
  `id` bigint unsigned NOT NULL AUTO_INCREMENT,
  `delivery_time` DATETIME NOT NULL COMMENT 'Delivery time',
  `payloads` blob COMMENT 'Message content'.PRIMARY KEY (`id`),
  KEY `time_index` (`delivery_time`)
)
Copy the code

The scheduled thread periodically scans for expired messages and delivers them. The scan interval for timed threads is theoretically the minimum amount of time you can delay messages.

Advantages:

  • Simple implementation;

Disadvantages:

  • The B+Tree index is not suitable for heavy writing in message scenarios;

Based on the RocksDB

RocksDB’s plan is to choose a more appropriate storage medium in the above scheme.

As RocksDB discussed in my previous article, LSM trees are better suited for scenarios with a lot of writing. The delay message module Chronos in DDMQ of Didi open source adopts this scheme.

The DDMQ project simply adds a unified agent layer on top of RocketMQ, where the functional dimensions can be extended. The logic of delayed messages is that the agent layer realizes the forwarding of delayed messages. If delayed messages are delivered to the chronOS-specific topic in RocketMQ. The delayed message module Chronos consumes the delayed messages and dumps them to RocksDB, followed by similar logic that periodically scans the expired messages and delivers them to RocketMQ.

To be honest, this plan is a relatively heavy one. Based on RocksDB, you still have to deal with the logic of synchronizing multiple copies of data from a data availability perspective.

Advantages:

  • The RocksDB LSM tree is good for heavy writing of message scenarios;

Disadvantages:

  • The implementation scheme is heavy. If you adopt this scheme, you need to implement the data Dr Logic of RocksDB by yourself.

Based on the Redis

Let’s talk about Redis’s plan. Here is a more complete scheme.

This solution comes from: www.cnblogs.com/lylife/p/78…

  • The structure is KV structure, the key is the message ID, and the value is a specific message. (The Redis Hash structure is selected here mainly because the Hash structure can store a large amount of data, and progressive rehash expansion will be performed when a large amount of data is stored. And the time complexity for both HSET and HGET is O(1).
  • Delayed Queue is 16 ordered queues (queues support horizontal expansion) with the structure of ZSET, value is the message ID in the message pool, and Score is the expiration time ** (divided into multiple queues to improve scanning speed) **
  • Worker represents the processing thread that scans the Delayed Queue for the messages due by a scheduled task

In my opinion, there are several considerations for Redis storage in this scheme.

  • Redis ZSET is great for implementing delayed queues
  • Performance issues. Although ZSET insertion is an O(logn) operation, Redis is memory based and has many internal performance optimizations.

However, there is something that needs to be considered in this plan. The above plan creates multiple Delayed queues to meet the requirements of concurrency performance, but it also brings about how the Delayed queues are evenly distributed under the condition of multiple nodes, and it is likely that the Delayed queues will be processed concurrently and repeatedly. Should we introduce concurrency control designs such as distributed locks?

In a small scenario, the architecture can be transformed into a master-slave architecture, allowing only the master node to process tasks and the slave node to perform disaster recovery (Dr) backup. The implementation is less difficult and more controllable.

Defects and improvements in timed thread checking

In all of the schemes mentioned above, expiration messages are obtained by thread periodic scanning schemes.

The scheme of timed thread will waste resources when the message volume is small, and will cause the problem of inaccurate delay time when the message volume is very large due to the unreasonable setting of scan interval. You can save CPU resources with wait-notify using ideas from the JDK Timer class.

Wait (execution time – current time). If a new message arrives and it is smaller than the one we are waiting for, notify it, retrieve the smaller message, and wait again.

2. Open source implementation scheme in MQ

What about the current open source MQ with its own delayed messaging capabilities

RocketMQ

The RocketMQ open source version supports delayed messages, but only at 18 levels, not arbitrary times. However, this Level is customizable in RocketMQ, which is good enough for normal business. The default value is 1S 5s 10s 30S 1M 2m 3m 4M 5M 6m 7m 8m 9m 10m 20m 30m 1H 2H. There are 18 levels.

QueueId = delayTimeLevel — 1; queueId = delayTimeLevel — 1; queueId = delayTimeLevel — 1 ** means that only messages with the same delay are stored in a queue, ensuring that messages with the same delay can be consumed in sequence. ** The broker consumes SCHEDULE_TOPIC_XXXX schedulingly, writing messages to the actual topic.

The following is the schematic diagram of the whole implementation scheme. Red represents the delivery delay message, and purple represents the delay message when the scheduled scheduling expires:

Advantages:

  • The number of levels is fixed. Each Level has its own timer, which costs little
  • Messages with the same Level are added to the same Queue to ensure the sequence of messages with the same Level. Different levels are placed in different queues to ensure the accuracy of delivery time.
  • By supporting only fixed levels, the ordering of messages with different latency becomes appending to fixed Level topics

Disadvantages:

  • The cost of modifying the Level configuration is too high, and the fixed Level is inflexible
  • CommitLog becomes large because of delayed messages

Pulsar

Pulsar supports “any time” delayed messages, but not in the same way as RocketMQ.

In layman’s terms, Pulsar’s delayed messages are sent directly to the Topic specified by the client, and then a time-based priority queue is created in out-of-heap memory to maintain the index information of the delayed messages. The one with the shortest delay will be placed on the head, and the longer the delay, the farther back. During the consumption logic, it judges whether there are expired messages that need to be delivered. If there are, it takes out the messages from the queue and queries the corresponding messages according to the index of delayed messages for consumption.

If the node crashes, Topics on that broker node will be moved to other available brokers and the priority queue mentioned above will be rebuilt.

The following is the schematic diagram of Pulsar delay message in Pulsar public account.

At first glance, this scheme looks very simple and supports messages at any time. But there are several big problems with this scheme

  • Memory overhead: The queue that maintains the index of delayed messages is placed in out-of-heap memory, and the queue is in the dimension of subscription groups (consumption groups in Kafka). For example, if your Topic has N subscription groups, then if your Topic uses delayed messages, N queues will be created. And with the increase of delayed messages and the increase of time span, the memory usage of each queue will also increase. (Yes, in this scenario, supporting arbitrary delayed messages might actually make the bug worse.)
  • Reconstruction time cost of delayed message index queue after failover: Reconstruction time may be in the hour level for large-scale delayed messages with long span time. (From Pulsar official account)
  • Storage overhead: The time span of delayed messages affects the spatial reclamation of message data already consumed in Pulsar. For example, if your Topic has a business requirement to support one-month delayed messages, and you send a one-month delayed message, the underlying storage in your Topic will retain a full month’s worth of message data, even though 99% of normal messages have been consumed during that month.

The community has also designed a solution to the problem of the first and second points above. By adding a time partition to the queue, the Broker only loads the queue of the current, recent time slice into memory, and the remaining time slice partition persists to disk, as shown in the following example:

But at present, there is no corresponding implementation version of this scheme. The effect of the first two defects can be reduced by stipulating that only delayed messages with a smaller time span can be used in practice. In addition, because the memory does not store the whole data of delayed messages, but only the index, millions of delayed messages may have a significant impact on the memory. From this perspective, it is understandable that the authorities have not improved the first two problems for the time being.

As for the third problem, it is estimated that it is more difficult to solve. It is necessary to separate delayed messages from normal messages in the data storage layer and store delayed messages separately.

QMQ

QMQ provides arbitrary time delay/timed messages, which you can specify to be delivered at any time within the next two years (configurable).

I put QMQ last because I think QMQ is the most reasonable delayed message design available in open source MQ. The core of the design is simply multi-level time wheel + delayed loading + delayed message separate disk storage.

If you are not familiar with the time wheel, you can read this article from Kafka to see the design of the time wheel algorithm

QMQ’s delayed/timed messages are implemented using a two-tier Hash wheel. The first layer is located on the disk, and each scale is a scale per hour (the default is one scale per hour, which can be adjusted according to the actual situation in the configuration). Each scale generates a schedule log file, because QMQ supports messages that are delayed within two years (the default is two years, which can be modified). A maximum of 2 * 366 * 24 = 17,568 files will be generated (fewer files will be generated if the maximum latency required to support is shorter). The second layer is in memory. When the message delivery time is coming, the message index of this hour (including the offset and size of the message in schedule log) will be loaded from the disk file to the hash wheel in memory. The hash wheel in memory is graduated in 500ms.

To summarize the design highlights:

  • The time wheel algorithm is suitable for delayed/timed message scenarios, eliminating the sorting of delayed messages, and the time complexity of insertion and deletion operations is O(1).
  • Through multi-stage time wheel design, the message delay with large time span is supported.
  • Through delayed loading, only the most recent messages will be consumed in memory, and the longer delayed messages will be stored in disk, which is memory friendly.
  • Delayed messages are stored separately (Schedule log), which does not affect the space recovery of normal messages.

conclusion

This paper summarizes the common delayed messaging schemes in the industry and discusses the advantages and disadvantages of each scheme. Hope to inspire readers.

reference

  • Blog.itpub.net/31555607/vi…
  • www.cnblogs.com/hzmark/p/mq…
  • Mp.weixin.qq.com/s/_wnwBgZgQ…
  • Github.com/qunarcorp/q…
  • Github.com/apache/rock…