The background,

When doing e-commerce related businesses before, there was a common demand scenario: after the user placed an order, the order would be canceled if the user did not pay for more than half an hour. Now we buy things on Taobao and JD.com, or order food through Meituan, and if we don’t pay within the specified time, the order will be cancelled. So, how to implement this timeout cancellation logic, via delayed messages on message queues, is a very stable implementation.

RocketMQ provides such delayed messages, where the producer sends messages with latency levels ranging from seconds to minutes and hours. After the message is sent, it is stored on the server of the message queue. The message will not be consumed by the consumer until the set delay has passed.

If we send a message with a set delay of 30 minutes when placing an order, the message will be consumed by the downstream system after 30 minutes, and then judge whether the order has been paid or not. If not, the order will be cancelled. This completes the logic of deferred cancellation through message queues.

Second, the principle of

Set the time delay

So let’s see how you can set the latency for a message and the body of a message can set the latency level using setDelayTimeLevel, okay

public void produce(a) {
    Message msg = new Message("TopicTest"."TagA"."OrderID188"."Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
    msg.setDelayTimeLevel(1)
    SendResult sendResult = producer.send(msg);
}

public void consume(a) {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
    consumer.subscribe("TopicTest"."TagA");
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start(); }Copy the code

The Message’s property is a hashmap that holds meta information.

public void setDelayTimeLevel(int level) {
    this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
}

void putProperty(final String name, final String value) {
    if (null= =this.properties) {
        this.properties = new HashMap<String, String>();
    }

    this.properties.put(name, value);
}
Copy the code

After receiving a message, the broker processes the message according to the delay level set in the message. There are 18 delay levels (1-18), ranging from 1s to 2h

public class MessageStoreConfig {
    
    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

}
   
Copy the code

So how does the system get consumers to start consuming specified messages after a set delay?

I have to say, The design of RocketMQ is quite clever, so let’s move on.

Messages stored

Delayed messages received by the broker are not sent to the topic specified by the sender as normal messages are sent to a special delayed topic. The delayed topic has 18 queues (queueId 0-17). The relationship between queueId and delayLevel is queueId + 1 = delayLevel. Therefore, after calculating the waiting time of the delayed message deliverTimestamp, the message will be stored in the queue of the corresponding delay level.

// If the message is delayed
if (msg.getDelayTimeLevel() > 0) {
    if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
        msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
    }
    
    // Reset topic and queueId for delayed messages to the specified RMQ_SYS_SCHEDULE_TOPICtopic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); .// Store the actual topic and queueId to property for backup
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));        
    msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

    msg.setTopic(topic);
    msg.setQueueId(queueId);
}
Copy the code

After that, ScheduleMessageService handles the task. ScheduleMessageService is executed when the broker is started to process messages in delayed queues, using the logic shown below.


public class ScheduleMessageService extends ConfigManager {

    // key: delayLevel | value: delayTimeMillis 
    private final ConcurrentMap<Integer, Long> delayLevelTable = new ConcurrentHashMap<Integer, Long>(32);

    public void start(a) {
        // Create a Timer to execute a scheduled task
        this.timer = new Timer("ScheduleMessageTimerThread".true);
            
        / / here for every queue delayLevel create a DeliverDelayedMessageTimerTask,
        // To process messages in the corresponding queue
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            Long offset = this.offsetTable.get(level);
            if (null == offset) {
                offset = 0L;
            }

            if(timeDelay ! =null) {
                this.timer.schedule(newDeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); }}}}Copy the code

ScheduleMessageService started, according to the number of delay queue create one-to-one DeliverDelayedMessageTimerTask, then cycle. This class inherits from TimerTask, is the JDK tool class, used to execute a scheduled task, principle can refer to this article how to implement a scheduled task – Java Timer/TimerTask source code principle parsing

The message for

Can see DeliverDelayedMessageTimerTask implementation run method, the main logic in executeOnTimeup approach, the time has come out from the corresponding delay queue the message, Message to the queue corresponding to the original topic. As long as there is no backlog in the queue, the message will be consumed immediately. (This part of the code implementation is more complex, interested can go to see the corresponding source code)

class DeliverDelayedMessageTimerTask extends TimerTask {
    private final int delayLevel;
    private final long offset;

    public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
        this.delayLevel = delayLevel;
        this.offset = offset;
    }

    @Override
    public void run(a) {
        try {
            if (isStarted()) {
                this.executeOnTimeup(); }}catch (Exception e) {
            // XXX: warn and notify me
            log.error("ScheduleMessageService, executeOnTimeup exception", e);
            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                this.delayLevel, this.offset), DELAY_FOR_A_PERIOD); }}public void executeOnTimeup(a) {
        ConsumeQueue cq =
            ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                delayLevel2QueueId(delayLevel));

        long failScheduleOffset = offset;

        if(cq ! =null) {
            
            Delivertimestamp-now <= 0;
            // Move the messages from the delayed queue to the queue for the Topic they were assigned, and the messages are immediately consumed by the consumer.
        } 

        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE); }}Copy the code

The general principle schematic diagram is as follows:

When a delayed message is received, the broker stores the delayed message into the queue of the delayed TOPIC. ScheduleMessageService then executes the scheduled task corresponding to each queue to check which messages in the queue are overdue. The messages are then forwarded to the original TOPIC of the messages, which are consumed by the respective producers.

Expansion – consumption retry

When using RocketMQ, you typically rely on consumer retry. Consumption retry on the consumer side is implemented in much the same way as delayed queues.

public void consume(a) {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
    consumer.subscribe("TopicTest"."TagA");
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            // If RECONSUME_LATER is returned, consumption will be retried
            returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start(); }Copy the code

RocketMQ states that the following three cases are treated as a consumption failure and retry is initiated.

  • Business consumer return ConsumeConcurrentlyStatus RECONSUME_LATER
  • The business consumer returns NULL
  • The business consumer actively/passively throws an exception

In business code, the retry function is used to retry the downstream logic. RocketMQ retries are not repeated at fixed intervals, and they are retried in a retreat mode, with the retries taking longer and longer. This interval is similar to the delay for setting delayLevel.

The Consumer client processes the consumption result of each message through the processConsumeResult method, and if it determines that a retry is needed, it sends the message to the broker through the sendMessageBack method, with the retry message containing information about the number of retries.

After the broker receives the message, SendMessageProcessor will process the retry message and set topic to RETRY_TOPIC. The logic is as follows:

public class SendMessageProcessor
    extends AbstractSendMessageProcessor
    implements NettyRequestProcessor {

    private RemotingCommand asyncConsumerSendMsgBack(ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {

        // Set a new topic for the retry message
        String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
        int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();

        // Determine the delayLevel based on the number of retries that have occurred
        if (0 == delayLevel) {
            delayLevel = 3 + msgExt.getReconsumeTimes();
        }
        msgExt.setDelayTimeLevel(delayLevel);

        // Number of retries +1
        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);

        // Store messages
        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

        // ...}}Copy the code

The commitlog. putMessage method internally determines that if delayLevel is set, topic is reset to SCHEDULE_TOPIC and messages are stored in a delay queue. The following is the same logic as ScheduleMessageService.

The logic diagram for the entire message retry is shown below:

As is shown in

  1. When consumers consume, they will subscribe to the specified TOPIC-NORMAL_TOPICThe retry TOPIC- corresponding to the ConsumerGroupRETRY_GROUP1_TOPICConsume messages from both topics.
  2. When a consumption failure occurs, the Client invokessendMessageBackMethod sends a failure message back to the broker.
  3. To the brokerSendMessageProcessorThe delay level is determined according to the current retry times, and the message is stored in the delay queue –SCHEDULE_TOPICIn the.
  4. ScheduleMessageServiceExpired messages are re-sent to retry TOPIC-RETRY_GROUP1_TOPICAt this point, the message is consumed by the Consumer, completing the retry process.

In contrast to the previous delayed message flow, retry messages are actually processed as delayed messages, but finally put into a special retry message queue.

Four,

Delayed messaging is a very important feature in very everyday business use, and RocketMQ achieves this through time slice classification + multiple queues + scheduled tasks, which is a clever design. And consumption try again using the strategy of retreat type, the gradient to retry time just agree with delay message strategy, so that you can directly use delay queue to complete the message retry function, from the strategy is very reasonable (message consumption repeated failed, try again in a short period of time the possibility of success is low), and reuse the underlying code, these are worth learning and using for reference.

Java Timer/TimerTask Timer/TimerTask Timer/TimerTask Timer/TimerTask Timer/TimerTask

Finally, you are welcome to comment, any form of suggestions and corrections are welcome👏

Original is not easy, welcome to forward to share this content, your support is my motivation!