In this lecture, we will focus on delayed messages.

This time let’s talk about business.

Business background

In e-commerce, after placing an order, some users will take the initiative to refund after paying. There are also some users after the order, not payment. But this part of the unpaid order, will be tied up in goods inventory.

Our current e-commerce App will create corresponding order data in the order table after placing an order. The status of these orders, some are unpaid, but the unpaid orders occupy the inventory of goods. In order to restore commodity inventory normally, our current treatment plan is as follows:

  • Start a scheduled task and scan the order list periodically every 30 minutes
  • If the order is paid, it is skipped and not processed
  • If the order is unpaid, but does not exceed 30 minutes, it is not processed
  • If the order is unpaid for more than 30 minutes, cancel the order

(Add: Cancelling an order is actually the reverse process of placing an order)

Solution disadvantages

What are the disadvantages of this scheme?

  • First, each scheduled task scans all orders, but only a small fraction of orders go unpaid and are 30 minutes late. Just doing a lot of useless work.
  • Second, if the number of order tables is super, super large, this time, the scanning time is very long, wasting CPU resources.
  • Third, this kind of frequent query database, to the database caused a lot of unnecessary stress.

The solution

Do we have any good solutions to these disadvantages?

  • First, avoid scanning the full table
  • Second, cancel anyone who doesn’t pay, and don’t make extra moves
  • Third, make sure to cancel orders in near real time. (Near real time: about 1s)

Having said that, I have laid my cards on the table and will not install it in order to introduce delayed messages for RocketMQ

To recap: When creating an order, send a 30-minute delay message. To 30 minutes later, consumers get the information, and then to judge whether the order has been paid, if the payment is skipped, not paid, then cancel the order.

This scheme: no redundant database scanning operations; Cancel anyone who doesn’t pay. How nice! In production, use it quickly.

producers

Above, the introduction is methodology, the following is the concrete practical operation link.

Now, a brief demo is used to introduce producers

Public class Producer {public static void main(String[] args) throws Exception{// Producer group DefaultMQProducer Producer = new  DefaultMQProducer("delay_producer_group"); SetNamesrvAddr ("localhost:9876"); // Set nameserver producer.setNamesrvaddr ("localhost:9876"); // Start producer. Start (); // Build Message Message Message = new Message("delayTopic","TagA","delayMessage".getBytes(remotingHelper.default_charset)); / / the key: set delay level message. SetDelayTimeLevel (3); // Sending messages SendResult SendResult = producer. Send (message); System.out.println(" sendResult: "+sendResult); // Close the producer producer.shutdown(); }}Copy the code

Again, it’s not delayed delivery, it’s delayed consumption. The delivery is immediate, but the purchase is delayed by 30 minutes.

Supplementary knowledge

The delay level starts at 1, not 0. Then you might find a delay of up to 2 hours. If you want a 3 hour delay, sorry, RocketMQ doesn’t support it. Goodbye!!

consumers

Public class Consumer {public static void main(String[] args) throws Exception {// DefaultMQPushConsumer Consumer  = new DefaultMQPushConsumer("delay_consumer_group"); // register nameserver consumer.setNamesrvaddr ("localhost:9876"); // Subscribe topic consumer.subscribe("delayTopic","TagA"); / / open consumption offset consumer. SetConsumeFromWhere (ConsumeFromWhere. CONSUME_FROM_FIRST_OFFSET); / / listener consumer. RegisterMessageListener (new MessageListenerConcurrently () {@ Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (int i = 0; i < list.size(); i++) { MessageExt messageExt = list.get(i); String msg = new String(messageExt.getBody()); / / here mainly print delay time material System. The current time - news production time out. The println (MSG + "time =" + (System. CurrentTimeMillis () - messageExt. GetBornTimestamp ())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start(); }}Copy the code

Conclusion: Delayed consumers are the same as ordinary consumers. Core point for delayed messages: The producer has an additional delay level.

Know what you are and why you are

Above, you already know how to use it.

If an interviewer asks you: What’s the underlying rationale behind Delayed messaging for RocketMQ?

Then you read on.

Look at the picture and speak.

  • First, messages sent by producers, because of the latency level, are distributed to a Topic called SCHEDULE_TOPIC_XXXX. There are 18 queues, and each queue corresponds to a latency level. For example, queueId = delayLevel – 1.

  • Second, the timer, every 100 milliseconds, scans for delayed messages at all latency levels. If the consumption time is longer than the current time, the timer sends the delayed message to the actual topic (delayTopic). Messages are sent to a specific queue based on load balancing policies.

  • Third, after there is a message, the consumer carries on the message and the follow-up processing.

Up here, this is a general flow chart.

Then, let’s take a closer look at the code. In fact, it is to deepen understanding.

Step 1: The producer sends the message to the SCHEDULE_TOPIC_XXXX topic

org.apache.rocketmq.store.CommitLog#putMessage
// Real topic String topic = MSG. GetTopic (); Int queueId = msg.getQueueid (); final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); If (tranType = = MessageSysFlag. TRANSACTION_NOT_TYPE | | tranType = = MessageSysFlag. TRANSACTION_COMMIT_TYPE) {/ / delay level greater than zero If (msg.getDelayTimelevel () > 0) {// If the delay level is greater than the maximum delay level, Then the delay level of level set to maximum delay the if (MSG) getDelayTimeLevel () > enclosing defaultMessageStore. GetScheduleMessageService () getMaxDelayLevel ()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } // Delay topicSCHEDULE_TOPIC_XXXX topic = topicvalidator.rmq_sys_schedule_topic; / / according to the level of delay, queue id queueId = ScheduleMessageService. DelayLevel2QueueId (MSG) getDelayTimeLevel ()); // Backup real topic, queueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); MSG. SetTopic (topic); MSG. SetTopic (topic); msg.setQueueId(queueId); }} omit section encapsulates MSG code.. // Add MSG to mappedFile (mappedFile) Here you think of it as a file result = mappedFile. AppendMessage (MSG, enclosing appendMessageCallback);Copy the code

Step 2: The timer scans the information

  • 1,org.apache.rocketmq.store.schedule.ScheduleMessageService#start
Public void start() {// AtomicBoolean ensures that the start method is executed only once if (started.compareAndSet(false, true)) { this.timer = new Timer("ScheduleMessageTimerThread", true); For (map. Entry<Integer, Long> Entry: This. DelayLevelTable. EntrySet ()) {/ / the key for delayed level Integer level = entry. The getKey (); Long timeDelay = entry.getValue(); // According to the delay level, obtain the corresponding offset Long offset = this.offsettable. // if (null == offset) { offset = 0L; } // Create a scheduled task for each delay level and execute the scheduled task 1S later. If (timeDelay! = null) {// step 2: Perform a specific core logic in DeliverDelayedMessageTimerTask -- > executeOnTimeup () this. The timer. The schedule (new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); }} // The scheduled task will be executed after 10 seconds. FlushDelayOffsetInterval = 10 s, cycle are 10 seconds to execute this. Once the timer. The scheduleAtFixedRate (new TimerTask () {@ Override public void the run () { Try {/ / consumer offset persistent each queue if started. The get () () ScheduleMessageService. Enclosing the persist (); } catch (Throwable e) { log.error("scheduleAtFixedRate flush exception", e); } } }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); }}Copy the code

2,org.apache.rocketmq.store.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup

Public void executeOnTimeup() {// According to the delay level and topic:RMQ_SYS_SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX"; ConsumeQueue ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); // Consumption offset long failScheduleOffset = offset; if (cq ! SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); if (bufferCQ ! = null) { try { long nextOffset = offset; int i = 0; ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); // iterate over all messages for (; i < bufferCQ.getSize(); I += consumequeue.cq_store_unit_size) {// Get the physical offset of the message long offsetPy = bufferCq.getByteBuffer ().getLong(); Int sizePy = bufferCq.getByteBuffer ().getint (); long tagsCode = bufferCQ.getByteBuffer().getLong(); Long now = system.currentTimemillis (); / / spending time long deliverTimestamp = this. CorrectDeliverTimestamp (now, tagsCode); // nextOffset nextOffset = offset + (I/consumequeue.cq_store_unit_size); Countdown = countdown - now; countdown = countdown - now; If (countdown <= 0) {// According to the physical offset and length, Get news MessageExt msgExt = ScheduleMessageService. This. DefaultMessageStore. LookMessageByOffset (offsetPy sizePy); if (msgExt ! = null) {try {// Build the real message MessageExtBrokerInner msgInner = this.MessageTimeUp (msgExt); / / the message is sent back to the real message queue PutMessageResult PutMessageResult = ScheduleMessageService. Enclosing writeMessageStore .putMessage(msgInner); . } // Add a new task here, This is 100 milliseconds ScheduleMessageService this. The timer. The schedule (new DeliverDelayedMessageTimerTask (enclosing delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE); }Copy the code

Step 3: Consumer follow-up (omitted)

I’ll conclude with a picture

All right, that’s it. See you next time. Bye.

If you have any questions, please leave a comment.

Ask a day

RocketMQ does not support custom latency. Does Kafka support delayed messages? If yes, is custom delay time supported? If you had to implement a custom delay time, how would you do it? Tell me what you think

Welcome to leave a message

Subsequent articles

  • RocketMQ- Getting Started (updated)
  • RocketMQ- Architecture and Roles (updated)
  • RocketMQ- Message Sending (updated)
  • RocketMQ- Consumption information
  • RocketMQ- Broadcast mode and Cluster Mode for Consumers (updated)
  • RocketMQ- Sequential messages (updated)
  • RocketMQ- Delayed messages (updated)
  • RocketMQ- Batch messaging
  • RocketMQ- Filters messages
  • RocketMQ- Transaction messages
  • RocketMQ- Message store
  • RocketMQ – high availability
  • RocketMQ – high performance
  • RocketMQ- Primary/secondary replication
  • RocketMQ- Swiping mechanism
  • RocketMQ – idempotence
  • RocketMQ- Message retry
  • RocketMQ- Dead letter queue

.

Welcome to join (guan) unit (Zhu), the follow-up article dry goods.