An overview of the

In JDK, ScheduledThreadPoolExecutor offer delay message function, but because of internal queue is used DelayedWorkQueue queue, and DelayedWorkQueue is using a heap, The insertion time complexity of the heap is all O(logn), which is very inefficient, and most MQS would not tolerate such efficiency.

As a result, MQ reimplements latency.

RocketMQ is even more crafty in the implementation of delayed messages. Instead of using the time round algorithm that most MQ uses, RocketMQ simply uses Timer.

Delay message dump

When a producer sends a delayed message, the broker must surely wait until the message expires before the consumer can consume it.

That is, messages must do extra processing on the broker side to avoid them.

The topic and queueId are dumped

The broker resets the message topic to SCHEDULE_TOPIC_XXXX and queueId to delayLevel-1 before the message is written to the commitlog. Then write the original topic and queueId to the properties of the message so that when the message expires, the original topic and queueId can be restored.

CommitLog#putMessage(final MessageExtBrokerInner MSG)


public class CommitLog {

    public PutMessageResult putMessage(final MessageExtBrokerInner msg) {

    if (msg.getDelayTimeLevel() > 0) {

        if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {

        msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());

        }

        //TODO will send delayed messages to another SCHEDULE_TOPIC_XXXX topic
        topic = ScheduleMessageService.SCHEDULE_TOPIC;

        // TODO uses latency level -1 as our queueId
        queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

        // Backup real topic, queueId
        // TODO backs up our original topic, as well as our original queueId
        // TODO REAL_TOPIC, REAL_QIDMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); }}}Copy the code

After this processing, the Consumer cannot immediately consume the delayed message that was just sent.

ConsumeQueue subentry tagsCode is reset

The Broker sets the tagsCode to the expiry time of the message when the CommitLog is dumped to the ConsumeQueue if it finds a delayed message.

Code location: CommitLog# checkMessageAndReturnSize


public class CommitLog {

    public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC,

    final boolean readBody) {

    // If TODO is a delayed message, it will store the expiration time of the message as tagsCode

        if (delayLevel > 0) {

            tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel, storeTimestamp); }}}Copy the code

Message expiration processing

How the Broker senses that a message is due for consumption by the Consumer is the focus. Next, take a look at the trickery of RocketMQ.

As we know, the Broker can change the default latency level of RocketMQ by configuring messageDelayLevel.

Here is the default configuration for RocketMQ


messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

Copy the code

That said, RocketMQ doesn’t really have the same flexibility as other MQS to provide delayed messages and must be configured through a configuration file.

Do not provide such a function, is more simple and funny to implement the delayed message function.

The implementation of delayed messages are all in this class ScheduleMessageService

Parse the messageDelayLevel configuration

When the Broker starts, the messageDelayLevel configuration is parsed

ScheduleMessageService#parseDelayLevel()

The main logic is as follows:

Parse messageDelayLevel and place the values in the delayLevelTable map.


public class ScheduleMessageService extends ConfigManager {

    public boolean parseDelayLevel(a) {

        HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();

        timeUnitTable.put("s".1000L);

        timeUnitTable.put("m".1000L * 60);

        timeUnitTable.put("h".1000L * 60 * 60);

        timeUnitTable.put("d".1000L * 60 * 60 * 24);

        String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();

        try {

            String[] levelArray = levlString.split("");

            for (int i = 0; i < levelArray.length; i++) {

                String value = levelArray[i];

                String ch = value.substring(value.length() - 1);

                Long tu = timeUnitTable.get(ch);

                int level = i + 1;

                if (level > this.maxDelayLevel) {

                    this.maxDelayLevel = level;

                }

                long num = Long.parseLong(value.substring(0, value.length() - 1));

                long delayTimeMillis = tu * num;

                this.delayLevelTable.put(level, delayTimeMillis); }}catch (Exception e) {

            log.error("parseDelayLevel exception", e);

            log.info("levelString String = {}", levelString);

            return false;
        }
        return true; }}Copy the code

The Timer starts

ScheduleMessageService#start()

public void start(a) {
    if (started.compareAndSet(false.true)) {
        this.timer = new Timer("ScheduleMessageTimerThread".true);
        // Get the configured latency level
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            // The consumption offset for each delay level
            Long offset = this.offsetTable.get(level);
            if (null == offset) {
                offset = 0L;
            }

            // Initialize the deferred scheduling task

            if(timeDelay ! =null) {
                this.timer.schedule(newDeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); }}}}Copy the code

OffsetTable is the Broker startup, from ${storePath} / store/config/delayOffset json parsing. The content stored in this file is a JSON file that contains the consumption offset for each latency level. Roughly as follows


{
"offsetTable": {1:10}}Copy the code

DeliverDelayedMessageTimerTask

The task is DeliverDelayedMessageTimerTask Timer, see the run of the next () method implementation logic.

The logic of run() is as follows

  1. According to the consumption offset of the delay queue, the message is obtained from the corresponding queue

  2. Gets the timestamp when the message was stored according to the tagsCode in the ConsumeQueue subentry

  3. The tagsCode is compared with the current time. If the tagsCode is less than or equal to the current time, the delayed message is restored to the original message for consumption by the Consumer

  4. Continue scheduling the next delayed message

Consumption offset persistence

The Broker persists delayed message consumption offsets every 10 seconds.

ScheduleMessageService#start()


public class ScheduleMessageService extends ConfigManager {

    public void start(a) {
        if (started.compareAndSet(false.true)) {
            // By default, persistence is performed every 10 seconds
            this.timer.scheduleAtFixedRate(new TimerTask() {
                @Override
                public void run(a) {
                    try {
                        if (started.get()) ScheduleMessageService.this.persist();
                    } catch (Throwable e) {
                        log.error("scheduleAtFixedRate flush exception", e); }}},10000.this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); }}}Copy the code