Writing in the front

In our daily development, we often encounter business scenarios where “after a certain amount of time, an event is triggered.” Such as:

  • Orders will be automatically cancelled if payment is not made within 30 minutes after placing an order on the e-commerce platform
  • Red envelopes will be refunded automatically if they are not collected within 24 hours

Common solutions

1. Perform periodic scanning

The trigger time of events is recorded in advance, and scheduled tasks constantly check the database to compare the trigger time.

This method is not real-time. As the execution frequency of scheduled tasks becomes higher, the real-time triggering will be improved. However, frequent scanning increases the pressure on the database and is also the simplest method.

2. JDK solutions

The JDK provides us with Timer, DelayQueue DelayQueue.

This method can be used in a single machine environment with low reliability requirements. Tasks and queues exist in THE JVM memory, so it does not support a distributed environment, and the system cannot be recovered after a sudden downtime.

3. Delayed messages of message middleware

The producer sends a delayed message, and the consumer can consume the message after a specified time, so that for our business development, we only need to focus on the message that has just expired.

There are a lot of mature messaging middleware out there. As a Java developer, I prefer RocketMQ because the other messaging middleware is a black box to me. In RocketMQ, you can even take source debug when you have a problem, a puzzle.

Introduction to RocketMQ delayed messages

Use the official producer-delivered message API

As you can see, the official method provided is setDelayTimeLevel(), not a custom delay. This article is the result of my confusion about the design of this place. The delay level ‘5’ in the figure is just an arbitrary level (corresponding to one minute), which will be explained in detail later.

As soon as the delivery is complete, open the console to check

The consumption sites corresponding to the four queues in this topic do not change, so consumers who subscribe to this topic cannot consume this message immediately

After waiting for a delay

I found that this message appeared in this topic, and the time point is just one minute after my delivery time

guess

From the external presentation, delayed messages are not delivered directly to the corresponding topic, but undergo some kind of “transfer” between the producer and the topic. The producer delivers delayed messages to this “relay station”, and other tasks take out expired messages from this “relay station” and send them to the topic.

Source code debug analysis

  • How to run RocketMQ in IDEA

Tip: SetDelayTimeLevel () : Message setDelayTimeLevel() : Message setDelayTimeLevel() : Message There must be a place to call getDelayTimeLevel() in Message. Break a few points where the getDelayTimeLevel() method is called to find the delayed processing logic.

Producer deliveryCommitLog.java

Ordinary messages with delay levels are dropped directly into the “relay station” — a special queue named SCHEDULE_TOPIC_XXXX

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {...if (msg.getDelayTimeLevel() > 0) {
        // Delay message processing logic
        if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
            // If the delay level is greater than the maximum value, the value is set to the maximum value
            msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
        }
        // a special topic constant name named "SCHEDULE_TOPIC_XXXX"
        topic = ScheduleMessageService.SCHEDULE_TOPIC;
        // the queue number is delaylevel-1 (delayLevel minus 1)
        queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
    
        // Backup real topic, queueId
        // Save the real topic and queueId of the message as other properties
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
        msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
        // Reset the topic of the message to "SCHEDULE_TOPIC_XXXX"msg.setTopic(topic); msg.setQueueId(queueId); }... }Copy the code

This particular topic “transfer station” of SCHEDULE_TOPIC_XXXX is not visible on the console, but exists in our persistent directory store, as shown in the figure, queue number is 4, which corresponds to the delay level decreased by 1 (5-1=4), consistent with our source code analysis.

Delay logic processingScheduleMessageService.java

1. Set the relationship between the delay level and the delay durationparseDelayLevel()

public boolean parseDelayLevel(a) {
        HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();
        timeUnitTable.put("s".1000L);
        timeUnitTable.put("m".1000L * 60);
        timeUnitTable.put("h".1000L * 60 * 60);
        timeUnitTable.put("d".1000L * 60 * 60 * 24);
        // A string of different delay times
        //String levelString = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
        String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
        try {
            String[] levelArray = levelString.split("");
            for (int i = 0; i < levelArray.length; i++) {
                String value = levelArray[i];
                String ch = value.substring(value.length() - 1);
                Long tu = timeUnitTable.get(ch);

                int level = i + 1;
                if (level > this.maxDelayLevel) {
                    this.maxDelayLevel = level;
                }
                long num = Long.parseLong(value.substring(0, value.length() - 1));
                long delayTimeMillis = tu * num;
                // The corresponding relationship between the storage delay level and the delay time
                this.delayLevelTable.put(level, delayTimeMillis); }}catch (Exception e) {
            log.error("parseDelayLevel exception", e);
            log.info("levelString String = {}", levelString);
            return false;
        }

        return true;
    }
Copy the code

2. Set a separate round-robin task for each delay levelstart()

public void start(a) {
    // CAS optimistic locks ensure thread safety
    if (started.compareAndSet(false.true)) {
        this.timer = new Timer("ScheduleMessageTimerThread".true);
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            // Iterate over the delay level
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            Long offset = this.offsetTable.get(level);
            if (null == offset) {
                offset = 0L;
            }

            if(timeDelay ! =null) {
                // Create a delay message dispatch task for each delay level. The first start delay is 1s
                this.timer.schedule(newDeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); }}/* Persistent task */. }}Copy the code

Sending delayed messagesDeliverDelayedMessageTimerTask.java

After the delayed message of “relay station” expires, it is converted into ordinary message and delivered to the target topic:

public void executeOnTimeup(a) {
    /* Get messages from SCHEDULE_TOPIC_XXXX for a specific delay level */.// The current time
    long now = System.currentTimeMillis();
    // The actual time when the message should be sent after a delay
    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
    // There is still time to wait
    long countdown = deliverTimestamp - now;
     if (countdown <= 0) {
        // It has expired without waiting
        MessageExt msgExt =
            ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                offsetPy, sizePy);

        if(msgExt ! =null) {
            try {
                // Convert delayed messages to normal messages (remember before 'commitlog.java' converts normal messages to delayed messages)
                MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                // Send the message to the destination topic
                PutMessageResult putMessageResult = ScheduleMessageService.this.writeMessageStore
                                                        .putMessage(msgInner);
                if(putMessageResult ! =null
                    && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                    continue;
                } else {
                    // XXX: warn and notify me
                    // Error retry
                    log.error(
                        "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
                        msgExt.getTopic(), msgExt.getMsgId());
                    ScheduleMessageService.this.timer.schedule(
                        new DeliverDelayedMessageTimerTask(this.delayLevel,
                            nextOffset), DELAY_FOR_A_PERIOD);
                    ScheduleMessageService.this.updateOffset(this.delayLevel,
                        nextOffset);
                    return; }}catch (Exception e) {
                log.error(
                    "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
                        + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
                        + offsetPy + ",sizePy="+ sizePy, e); }}}else {
        //countdown>0, the message has not expired yet, that is, milliseconds to wait for countdown
        Countdown: countdown to countdown to countdown to countdown to countdown to countdown to countdown to countdown
        ScheduleMessageService.this.timer.schedule(
            new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
            countdown);
        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
        return; }...// If no delay message is found, the timer recurses again after 0.1s delay
    ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                failScheduleOffset), DELAY_FOR_A_WHILE);
}
Copy the code

From what has been discussed above

conclusion

  • A layer of “hubs” – named – has been added between the producer and destination topicsSCHEDULE_TOPIC_XXXXThe topic of
  • By default, there are 18 queues in this particular topic, with different latency levels
  • Each queue in a topic has a task that checks whether the message in the queue is due, and if so, delivers it to the final destination topic

features

  • 1. All messages are partitioned at the delay level to improve file search performance

Benefits: directory more certainly convenient file search, the speed of file reading and writing positioning has been improved.

  • 2. For each level of partitioned directory, an ordered queue is maintained from small to large according to the delay time

Benefits: For the same level, the delay of the new message must be the largest and placed at the end of the queue. We only need to pay attention to the first one in the queue, because the first one expires first (to ensure the real-time triggering of the delay). Read queue header, new messages appended at the end (sequential read and write improves performance).

guess

  • If arbitrary precision is supported

No matter what standard is used to partition messages, sequential read and write messages in partitioned queues cannot be guaranteed (in this file model without introducing other middleware).

To sum up: RocketMQ delayed messages take into account the read and write performance of messages with different delays after persistence and the real-time triggering of delay. The “delay level” scheme is introduced to balance performance and real-time

discuss

I am not a technical master, I can only guess the benefits of setting up such a plan according to the code logic, and there is no other design, whether it is feasible, if you have other plans, welcome comment discussion.

Other articles recommended

  • Take you to build your own distributed message push system