About the Apache Pulsar

Apache Pulsar is a top-level project of Apache Software Foundation. It is a next-generation cloud-native distributed message flow platform that integrates messaging, storage, and lightweight functional computing. It is designed with a computing and storage separation architecture, and supports multi-tenant, persistent storage, and cross-region data replication in multiple rooms. It provides streaming data storage features such as strong consistency, high throughput, low latency, and high scalability.

Making address: http://github.com/apache/pulsar/

Apache Pulsar is a multi-tenant, high-performance inter-service messaging solution that supports multi-tenancy, low latency, read/write separation, cross-region replication, rapid expansion, and flexible fault tolerance. The MQ team of Tencent Data Platform department has done in-depth research on Pulsar and a lot of performance and stability optimization, which has been launched in Tencent cloud message queue TDMQ.

This article mainly introduces the implementation of Pulsar delayed message delivery, hope to communicate with you.

What is delayed message delivery

Delayed message delivery is very common in MQ application scenarios. It means that the message is not delivered immediately after being sent to the MQ server, but is delivered to the consumer after a fixed delay according to the attributes in the message. Generally, it is divided into scheduled message and delayed message.

  • Timed messages: The Producer sends a message to the MQ server, but does not expect the message to be delivered immediately. Instead, the Producer postpones the message to be delivered to the Consumer for consumption at some time after the current point in time.
  • Delayed messaging: The Producer sends a message to an MQ server, but does not expect the message to be delivered immediately. Instead, the Producer delays the message to a Consumer for consumption.

Currently in the industry, Tencent Cloud’s CMQ and Alibaba Cloud’s RocketMQ also support delayed message delivery:

  • CMQ: The duration of the message delay is defined as “flight status”. You can set DelaySeconds to configure the delay range. The value ranges from 0 to 3600 seconds.
  • RocketMQ: Open source version deferred messages are temporarily stored in an internal topic and support specific levels such as timing 5s, 10s, 1M, etc. Commercial version supports arbitrary time precision.

Open source NSQ, RabbitMQ, ActiveMQ and Pulsar are also built with delayed message processing capabilities. While the use and implementation of each MQ project is different, the core implementation idea is the same: The Producer sends a Delayed message to a Topic, the Broker puts the Delayed message into temporary storage for temporary storage, and the Delayed Tracker Service checks whether the Delayed message is due and delivers the overdue message.

Usage scenarios for delayed message delivery

Delayed message delivery is to postpone the processing of the current message until the delivery is triggered at a certain point in the future. The actual application scenarios are many, such as exception detection retry, order timeout cancellation, appointment reminder, etc.

  • If the service request is abnormal, put the abnormal request into a separate queue and retry after 5 minutes.
  • Users buy goods, but have not paid, need to remind users to pay regularly, timeout will close the order;
  • Interview or meeting appointment, half an hour before the interview or meeting, send a notification to remind again;

Recently, there was a Case of using Pulsar delayed messages in the service product: the service wanted to associate the log messages of two systems. One of the systems may time out or fail to query Hbase, so the failed association task needs to be scheduled again when the cluster is idle.

How do I use Pulsar to delay message delivery

Pulsar first introduced the feature of delayed message delivery in 2.4.0. The delayed message can be used in Pulsar, and the time of delayed delivery can be specified precisely. There are two methods: deliverAfter and deliverAt. DeliverAt can specify a specific timestamp; DeliverAfter can specify how long after the current time to execute. The essence of both approaches is the same: the Client calculates the timestamp and sends it to the Broker.

1. DeliverAfter sent

producer.newMessage()
 .deliverAfter(long time, TimeUnit unit)
 .send();

2. DeliverAt sent

producer.newMessage()
 .deliverAt(long timestamp)
 .send();

In Pulsar, you can support delayed messages with large spans, such as one month, six months; Both delayed and non-delayed messages are supported within a Topic. The following figure shows the process of delayed messages in Pulsar:

M1 /m3/ M4 /m5 sent by producer has different delay time, m2 is a normal message that does not need to be delayed delivery, and consumers will ack according to different delay time.

Pulsar delayed message delivery implementation principle

As you can see from the above usage, Pulsar supports delayed message delivery with second-level accuracy, unlike open source RocketMQ, which supports a fixed time-level delay.

The Delayed Message delivery method of Pulsar is relatively simple. All Delayed messages are recorded by the index of Delayed Message Tracker. Index is a timestamp | LedgerID | EntryID of three parts, including LedgerID | EntryID used to locate the message, the timestamp in addition to record the time of delivery, Also used to order the delayed Index priority queue.

The Delayed Message Tracker maintains a Delayed Index priority queue in out-of-heap memory, which is heap-sorted by latency, with the most Delayed being placed on the head and the longer the delay, the later. When consuming, the consumer will first check the Delayed Message Tracker to see if there are any messages that are due to be delivered. If there are any messages that are due, it will take out the corresponding index from the Tracker to find the corresponding Message for consumption. If there are no expired messages, the normal messages are consumed directly.

In the event of Broker downtime or topic ownership transfer in the cluster, Pulsar will re-establish the delayed Index queue to ensure that delayed messages work properly.

Pulsar delays message delivery challenges

From the implementation principle of delayed message delivery in Pulsar, it can be seen that this method is simple and efficient, less intrusive to the core of Pulsar, and can support delayed messages up to any time. However, it was found that the implementation scheme of Pulsar could not support the large-scale use of delayed messages for the following two reasons:

1. The memory of the delayed Index queue is limited

The delayed index of a delayed message is composed of three longs, which is relatively inexpensive for small delayed messages. But since the index queue is a subscription level, it is required to maintain as many index queues as possible for the same partition of a topic. At the same time, the index queue consumes more memory as more delayed messages are delayed for longer periods of time.

2. Cost of delayed Index queue reconstruction

As mentioned above, Pulsar will rebuild the delayed Index queue in the event of Broker downtime or topic ownership transfer in the cluster. For large delayed messages that span a long time, the rebuild time may be at the hourly level. In order to reduce the reconstruction time of the delayed Index queue, although more partitions can be assigned to the topic to improve the concurrency of reconstruction, it does not completely solve the reconstruction time cost problem.

Pulsar delays message delivery for future work

Pulsar’s current solution for delayed message delivery is simple and efficient, but there are still risks when dealing with large delayed messages. With respect to delayed message delivery, the Pulsar community and the Tencent Data Platform MQ team will next focus on supporting massively delayed messages. The scenario currently under discussion is to add a time partition to the delayed Index queue, with the Broker loading only the most recent delayed index into memory and the rest of the delayed index partition persistant to the disk, as shown in the following figure:



In the figure above, we partitioned the delayed Index queue at 5-minute intervals. M5 and M1 were placed on Time partition 1, which were placed in memory due to the closest delay time. M4 and M3 are on Time partition 2. The delay time is later, and index is stored on the disk. This approach not only reduces the reconstruction time cost of the delayed Index queue, but also reduces the memory dependence.

conclusion

This article introduces the concepts and usage scenarios of delayed message delivery, and extends the implementation principle of Apache Pulsar in detail. Pulsar’s current solution is simple and efficient, supporting delayed message delivery with second-level accuracy, but it has some limitations when dealing with large delayed messages. The Pulsar community and the Tencent DATA Platform MQ team will also focus next on supporting massively delayed messaging.

The author of this article, zhang Chao, originally posted in the public account “Tencent Cloud Middleware”, has been authorized to reprint, slightly more than the original.

Welcome to pay attention to ApachePulsar Chinese community public account ApachePulsar, get the latest ApachePulsar dynamic and technical dry goods.