Advanced RocketMQ message types

If life is compared to the artistic conception of creation, then reading is like sunshine — Chi Li

“This is the 14th day of my participation in the First Challenge 2022.

Message type

  1. Ordinary message
  2. The order message
  3. Delay message
  4. Transaction message
  5. Batch message

What is a normal message

For messages with no special requirements, the producer sends the message in Round Robin to different partitioned queues. Consuming messages pulls messages from multiple queues, such as log messages

What are sequential messages

Sequential messages are messages that are consumed strictly in the order in which they are sent (FIFO). By default, producers send messages in Round Robin to different partitioned queues. While consuming messages will pull messages from multiple queues, the order of sending and consuming is not guaranteed.

How to keep messages in order

Ordering of messages is strictly guaranteed by sending messages only to the same Queue and pulling messages only from that Queue when consumed

Why do I need sequential messages

For example, you now have TOPIC ORDER_STATUS, which has four queues under it, and the different messages in this TOPIC are used to describe the different states of the current order.

Assume that the order has status: unpaid, paid, shipping, shipping successful, shipping failed.



Based on the order status above, the producer can generate the following messages from the sequence:

Order T0000001: Unpaid –> Order T0000001: Paid –> Order T0000001: Shipping –> Order T0000001: shipping failed



After the message is sent to MQ, the selection of the Queue if a polling strategy is used, the message may be stored in MQ as follows:





In this case, we want the Consumer to consume the messages in the same order as we send them, but we can’t guarantee that the order is correct in the way MQ is delivered and consumed. For out-of-order messages, even if the Consumer is set up with some state tolerance, it can’t handle all of these random combinations



Based on the above situation, a scheme can be designed as follows: for messages with the same order number, through certain policies, they are placed in a Queue, and then consumers adopt certain policies (for example, a thread independently processes a Queue to ensure the order of processing messages) to ensure the order of consumption

Ordered classification

The global order

The order that is guaranteed when there is only one sending and consuming Queue is the order of messages in the entire Topic, calledThe global order



Specify the number of queues when creating a Topic. There are three ways to specify:

  1. When creating Producer in code, you can specify the number of queues that Producer will automatically create
  2. Specify the number of queues when manually creating a Topic in the RocketMQ visual console
  3. Specify the number of queues when manually creating a Topic using the mqadmin command

Partition and orderly



If more than one Queue participates, which can only guarantee the order of messages on the partitioned Queue, then calledPartition and orderly

How to implement Queue selection?

When defining Producer, we can specify message queue selectors that we ourselves implement for the MessageQueueSelector interface

When defining the selection algorithm for a selector, a selection key is usually used. The selection key can be a message key or other data. But whoever makes the choice key, it can’t be repeated, it’s unique



General selection algorithm

Modulo the selection key (or its hash value) with the number of queues that the Topic contains, and the result is the QueueId of the selected Queue

There is a problem with the modulus algorithm

The modding results of different keys and queues may be the same, that is, messages with different keys may appear in the same Queue, that is, the same Consuemr may consume messages with different keys.

How can this problem be solved?

The usual approach is to get the selection key from the message and judge it. If the current Consumer needs to consume information, it consumes it directly; otherwise, it does nothing. This approach requires that the selection key be available to the Consumer along with the message. It is a good idea to use the message key as the key of choice

Will the above approach lead to the following new problems?

The message that does not belong to the Consumer is pulled, so can the Consumer that should consume the message consume it again? Messages in the same Queue cannot be consumed by different consumers in the same Consumer group. Therefore, consumers consuming messages with different selection keys in the same Queue must belong to different Consumer groups. However, the consumption among different Consumer groups is isolated from each other and does not affect each other.

Partition ordered code implementation

public class OrderedProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        producer.setNamesrvAddr("rocketmqOS:9876");
        producer.start();
        for (int i = 0; i < 100; i++) {
            Integer orderId = i;
            byte[] body = ("Hi," + i).getBytes();
            Message msg = new Message("TopicA"."TagA", body);
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Integer id = (Integer) arg;
                    int index = id % mqs.size();
                    returnmqs.get(index); } }, orderId); System.out.println(sendResult); } producer.shutdown(); }}Copy the code

What is a delayed message

Delayed messages Are messages that can be consumed for a specified amount of time after a message has been written to the Broker. Delayed messages RocketMQ uses delayed messages to perform scheduled tasks without using timers.

Typical application scenarios

  1. The scenario of closing an order without payment over time in an e-commerce transaction

In an e-commerce platform, a delayed message is sent when an order is created. The message will be delivered to the Consumer 30 minutes later, and the Consumer will determine whether the corresponding order has been paid. If not, cancel the order and put the item back in stock. If the payment is completed, it is ignored

  1. 12306 platform booking timeout without payment cancel the booking scene

In 12306, a delayed message is sent when a ticket is booked. The message will be delivered to the Consumer 45 minutes later, and the Consumer will determine whether the corresponding order has been paid. If not, the reservation is cancelled and the ticket is returned to the ticket pool. If the payment is completed, it is ignored

Delay level

The delay duration of delayed messages does not support arbitrary delay duration and is specified by a specific delay level. The latency level is defined in the RocketMQ server MessageStoreConfig class in the following variables

messageDelayLevel = '1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h';
Copy the code

If the delay level is specified as 3, the delay duration is 10s, which means that the delay level is counted from 1. If you need to define a delay level, you can add the following configuration to the configuration loaded by the broker (for example, add 1d for 1 day).

messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d
Copy the code

The configuration files are in the conf directory under the RocketMQ installation directory

Implementation principle of delayed message





The concrete implementation scheme is as follows:



After the Producer sends a message to the Broker, the Broker first writes the message to a Commitlog file, which then needs to be distributed to the corresponding ConsumeQueue. However, before distributing the message, the system determines whether there is a delay level in the message. If not, it will be distributed directly as normal; If so, there is a complicated process:

  1. Modify the Topic of the message to SCHEDULE_TOPIC_XXXX
  2. Create the corresponding queueId directory and consumequeue file (if none exist) under the SCHEDULE_TOPIC_XXXX topic in the consumeQueue directory according to the delay level.

The mapping between delayLevel and queueId is queueId = delaylevel-1. When creating a queueId directory, not all directories corresponding to the delayLevel are created at one time

  1. Modify message index unit content. The Message Tag HashCode part of the index unit originally stores the Hash value of the Message’s Tag. Now change to the delivery time of the message. Post time is the time when the message is rewritten to the original Topic and then written to the Commitlog again. Delivery time = message storage time + delay level time. Message store time refers to the timestamp when the message was sent to the Broker
  2. Writes the message index to the corresponding consumequeue under the SCHEDULE_TOPIC_XXXX topic

How are messages of each delay level Queue in the SCHEDULE_TOPIC_XXXX directory sorted

Is sorted by message delivery time. All delayed messages of the same rank within a Broker are written to the same Queue in the consumeQueue directory SCHEDULE_TOPIC_XXXX. That is, the latency levels of message delivery times in a Queue are the same. The delivery time then depends on the message storage time. That is, messages are sorted by the time they were sent to the Broker

Delivery delay message

The Broker has a delayed message service class, ScheuleMessageService, which consumes messages in SCHEDULE_TOPIC_XXXX and delivers delayed messages to the target Topic according to the delivery time of each message. However, before Posting, the original written message is read again from the Commitlog and its original delay level is set to 0, that is, the original message becomes an ordinary message without delay. The message is then delivered again to the target Topic (this is really just a normal message delivery. ScheuleMessageService creates and starts a TImer when the Broker starts, which is used to execute the corresponding scheduled task. The system defines a corresponding number of TimerTasks according to the number of delay levels. Each TimerTask is responsible for consuming and delivering messages of a delay level. Each TimerTask checks whether the first message in the corresponding Queue is due. If the first message is not due, all subsequent messages are not due (messages are sorted by delivery time). If the first message expires, it is delivered to the target Topic, that is, consumed

Delayed message implementation

public class DelayProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        producer.setNamesrvAddr("rocketmqOS:9876");
        producer.start();
        for (int i = 0; i < 10; i++) {
            byte[] body = ("Hi," + i).getBytes();
            Message msg = new Message("TopicB"."someTag", body);
            // Set the message latency level to 3, that is, 10 seconds
            msg.setDelayTimeLevel(3);
            SendResult sendResult = producer.send(msg);
            // Outputs the time when the message was sent
            System.out.print(new SimpleDateFormat("mm:ss").format(new Date()));
            System.out.println(","+ sendResult); } producer.shutdown(); }}Copy the code

Transaction message

Transaction message is not in order to solve the distributed transaction, but fall library must provide messages and business consistency, its implementation principle is a concrete application of distributed transaction RocketMQ adopted the idea of 2 PC to realize the commit the transaction message, at the same time increase a compensation logic to handle two-phase timeout or failure of the message Note:

  1. Transaction messages do not support delayed messages
  2. Do idempotency checks for transaction messages because they may be consumed more than once (because of rollback and recommit)

What is 2PC [Phase 2 submission] need to be added

Transaction message scenario

Here is A demand scenario: ICBC user A transfers 10,000 yuan to CCB user B, and we can use synchronous message to deal with this demand scenario:

  1. The ICBC system sends a synchronous message M to Broker with an increase of $10,000 to B
  2. After the message is successfully received by the Broker, a successful ACK is sent to the ICBC system
  3. Icbc system deducts 10,000 yuan from user A after receiving A successful ACK
  4. CCB system obtains message M from the Broker
  5. CCB system consumes message M, that is, 10,000 yuan is added to user B

Problem There is a problem: If the deduction in step 3 fails, but the message has been successfully sent to the Broker. For MQ, the message can be consumed as soon as it is written successfully. At this time, user B in the CONSTRUCTION Bank system increased by 10,000 yuan. There are data inconsistencies

solution

The idea is to make steps 1, 2, and 3 atomic, and either they all succeed or they all fail. That is, after the message is successfully sent, the payment must be successfully deducted. If the deduction fails, the message sent successfully is rolled back. The idea is to use transaction messages. The distributed transaction solution is used here



Use transaction messages to handle this requirement scenario:

  1. Transaction manager TM initiates an instruction to transaction coordinator TC to enable global transactions
  2. Icbc system sends a transaction message M to TC with an increase of RMB 10,000 to B
  3. The TC sends a half-transaction message prepareHalf to the Broker, withholding message M. Message M in the Broker is not visible to the CCB system
  4. The Broker reports the results of the pre-commit execution to the TC
  5. If the pre-submission fails, TC will report the response of pre-submission failure to TM, and the global transaction ends. If the pre-submission is successful, TC will call the callback operation of ICBC system to complete the operation of withholding RMB 10,000 from ICBC user A
  6. Icbc system will send the execution result of withholding payment to TC, that is, the execution status of local transaction
  7. TC will report the result to TM after receiving the execution result of withholding payment. There are three possibilities of the execution result of withholding payment:
  8. The local transaction succeeded. Procedure
  9. Local transaction execution failed. Procedure
  10. If no, a check is required to determine the execution result of the local transaction
  11. TM will send different confirmation instructions to TC according to the reported results
    1. If the pre-payment is successful (the local transaction status is COMMIT_MESSAGE), TM sends the Global Commit instruction to the TC
    2. If the prepayment fails (the local transaction status is ROLLBACK_MESSAGE), TM sends the Global Rollback command to the TC
    3. If the status is unknown (the local transaction status is unknown), the local transaction status check operation of icbc system will be triggered. During the check back operation, COMMIT_MESSAGE or ROLLBACK_MESSAGE is reported to the TC. The TC reports the result to TM, which then sends the final confirmation command Global Commit or Global Rollback to the TC
  12. TC will send confirmation instruction to Broker and ICBC system after receiving the instruction
    1. If the TC receives a Global Commit instruction, it sends a Branch Commit instruction to the Broker and ICBC system. Only then can the message M in the Broker be seen by the CCB system; The deduction operation in icbc user A at this time was really confirmed
    2. If the TC receives the Global Rollback instruction, it sends the Branch Rollback instruction to the Broker and ICBC system. The message M in the Broker is now destroyed; The deduction operation in ICBC user A will be rolled back

The above solution is to ensure that the message delivery and debit operation can be in a transaction, if successful, success, failure, all rolled back, the above solution is not a typical XA pattern. Because branch transactions in the XA pattern are asynchronous, message withholding and withholding operations in the transaction messaging scheme are synchronous