preface

Hello, everyone, MY name is Jack Xu. This article is the last of the Series of RockeMQ intensive lectures. It introduces some advanced knowledge of RockeMQ, which will be used in our daily interviews.

Lecture 1: RocketMQ High Availability Architecture and Two-master two-Slave Asynchronous Cluster Deployment

Lecture 2: RocketMQ Literacy Patch and Java API Introduction

Source code used in this article: github.com/xuhaoj/rock…

The official documentation translation: www.itmuch.com/books/rocke…

In order to enable you to have a clear and hierarchical grasp of these knowledge, we will explain from the three dimensions of producer, Broker and consumer.

producers

Message sending rules

In RocketMQ, a kafka-like partitioning effect is achieved based on multiple Message Queues. If a Topic is sending and receiving very large amounts of data and needs machines that can support additional parallel processing to speed up processing, a Topic can set up one or more Message queues as needed. Once a Topic has multiple Message queues, messages can be sent to each Message Queue in parallel, and consumers can read and consume messages from multiple Message queues in parallel.

So what Message Queue does a Message go to? That’s where we need our routing strategy. Among the many overloaded methods of Send is the parameter MessageQueueSelector.RocketMQ has already helped us implement three implementation classes:

  • SelectMessageQueueByHash (default) : This is an ever-incrementing, polling method.
  • SelectMessageQueueByRandom: randomly selected a queue.
  • SelectMessageQueueByMachineRoom: returns an empty, no implementation.

If the above items do not meet our requirements, we can also customize MessageQueueSelector and pass it as a parameter:

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
               Integer id = (Integer) arg;
               int index = id % mqs.size();
               return mqs.get(index);
       }
}, orderId);
Copy the code

Source in example/ordermessage/Producer. Java

The order message

A very classic interview question, how to ensure the order of the message? The idea is that you need to make sure that the sequential messages are sent to the same Message queue. Second, a Message queue can only be consumed by one consumer, which is guaranteed by the message queue allocation mechanism. Finally, a consumer’s internal consumption of an MQ is guaranteed to be orderly. We want a one-to-one relationship between producer, message queue and consumer.

The specific operation process is as follows:

  1. When a producer sends a message, it should arrive at the Broker in an orderly fashion. So for producers, you can’t use multiple threads to send asynchronously, you can send sequentially.
  2. When writing brokers, they should be written sequentially. That is, messages on the same topic should be written collectively, choosing the same Message Queue instead of being written separately.

To do this, we simply pass in the same hashKey when we send, and the same queue will be selected.

3. There can only be one thread when a consumer consumes, otherwise due to different consumption rates, there may be disorder when recording to the database. In Spring Boot, consumeMode is set to ORDERLY, and in the Java API, the implementation class of MessageListenerOrderly is passed in.

consumer.registerMessageListener(new MessageListenerOrderly() {
Copy the code

Of course sequential consumption brings some problems:

  1. If a message fails, it cannot be skipped and the current queue consumption is suspended
  2. Reduces message processing performance

Transaction message

There are many solutions for distributed transactions, one of which is to use RocketMQ’s transaction messages to achieve ultimate consistency. Let’s look at how RocketMQ is implemented. Here is a flow chart from RocketMQ’s website. Rocketmq.apache.org/rocketmq/th…

  1. The producer sends a half-message to the RocketMQ server. A half-message is a message that is temporarily undeliverable to the consumer. The sender has successfully sent the message to the MQ server.
  2. The MQ server sends an ACK to the producer telling the producer that the half-message has been successfully received.
  3. The sender starts executing the logic of the local database transaction.
  4. When the local transaction is successfully executed, comMint will be told the result. MQ Server will make the semi-message state deliverable after receiving the COMMIT, and consumer will finally receive the message. If the local transaction fails, send rollback, MQ Server will delete the half-message after rollback, and the subscription fee will not receive the message.
  5. If no confirmation from Step 4 is received, check back the transaction status. Message backcheck: RocketMQ’s sender provides an interface to backcheck the transaction status due to network outages, producer restarts, etc. If a half-message has not received any action for a period of time, the Broker uses the backcheck interface to tell the sender whether the transaction was successfully executed.
  6. After receiving the message, the sender needs to check the final result of the local transaction execution of the corresponding message.
  7. The sender submits a second acknowledgement based on checking the final status of the local transaction and sends commit or ROLLBACK.

This is the entire transaction message execution flow, let’s look at how to operate in code. RocketMQ provides a TransactionListener interface, which we need to implement, and then execute the local transaction logic in the executeLocalTransaction method.

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        //local transaction process,return rollback,commit or unknow
        log.info("executeLocalTransaction:"+JSON.toJSONString(msg));
        return LocalTransactionState.UNKNOW;
    }
Copy the code

The checkLocalTransaction method must return a rollback, a COMMIT, or an unknown state. After the unknown state is returned, the Broker will initiate a query about the result of the transaction because it is uncertain whether the transaction succeeded or not.

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
       log.info("checkLocalTransaction:"+JSON.toJSONString(msg));
       return LocalTransactionState.COMMIT_MESSAGE;
    }
Copy the code

By default, the total number of check backs is 15. The interval for the first check is 6s, and the interval for subsequent check backs is 60s. Finally, specify a transaction listener when the producer sends.Source in example/transaction/TransactionProducer. Java

Delay message

Most of the time, our village will be in such business scenarios: after a period of time, to complete a work task requirements, for example: After the completion of didi taxi order, if the user does not comment, the evaluation will automatically be 5 stars in 48 hours; Takeout order 30 minutes without payment automatically cancelled and so on. There are several solutions to this problem, one of which is RocketMQ’s delayed queue, but the open source version has been stripped of its functionality to support only certain levels of messages, while the commercial version can specify any time.

   msg.setDelayTimeLevel(2); / / for five (5) seconds
Copy the code

For example, leve=2 means 5 seconds, which supports 18 levels. The delay level is configured in the code MessageStoreConfig:

  private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
Copy the code

This is used in Spring Boot

  rocketMQTemplate.syncSend(topic,message,1000.2);/ / for five (5) seconds
Copy the code

Source in example/delay/DelayProducer. Java

Broker

Physical storage

Let’s go to the RocketMQ store folder, which we specified during installation.

Here are the functions of these folders:

  1. Checkpoint: file checkpoint that stores the time or timestamp of the last flush of commitlog, ConsumeQueue, and IndexFile.
  2. Commitlog: message storage directory, a set of files. Each file is 1 GB by default. When the first file is full, the second file is named after the original file. For example, the start offset is 1073741824, the second file name is 00000000001073741824, and so on.

  1. Config: indicates the running configuration information, including the topic message filtering information, cluster consumption mode message consumption progress, delay message queue pull progress, message consumption group configuration information, and topic configuration attributes.
  2. Consumequeue: Consumequeue = consumeQueue = consumeQueue = consumeQueue = consumeQueue = consumeQueue = consumeQueue = consumeQueue = consumeQueue In each Message Queue folder is the offset of the message stored in the Commit log, along with the size and Tag attributes.

5. Index: the directory where the message index file is stored. When we sent the message using the Java API, we saw that we passed in a key parameter, which is used to retrieve the message. So if keys are present, the server creates an index file, generating an index for each space-separated keyword. A single IndexFile can hold 2000W indexes with a fixed size of 400 MB.Indexes use hash indexes, so try to set the key to be unique.

The storage

We’ll look at RocketMQ shows the club’s official website, rocketmq.apache.org/rocketmq/ho… Let’s take a look at why Kafka doesn’t support more partitions, and how we support more partitions in RocketMQ.

  1. Each partition stores the entire message data. Although each partition is written to disk in an orderly fashion, as the number of concurrent writes increases, the writes become random from the operating system’s perspective.
  2. The Linux IO Group Commit mechanism is difficult to use because data files are scattered.

So RocketMQ took a different approach and designed a new way of storing files, in which all messages from all topics are written in the same file, so that they can be written in absolute order. Of course, consuming is complicated. We can’t go through all the messages on a huge commitlog, it’s too slow.

So what to do? This is the consume queue mentioned above, which stores the last consumed offset of the Consume group topic. When consuming, we first read the persistent message’s starting physical location offset, size size, and hashcode value of the message tag from the Consume Queue, and then read the actual physical content portion of the message to be pulled from the Commitlog.

Consume queue can be understood as an index of messages, and there are no messages in it. Of course, this storage concept is not perfect either. For commitlogs, although they write sequentially, they read completely randomly. Reading a message reads the Consume queue first and then the commit log, which increases the overhead.

File Cleaning Policy

Just like Kalka, the contents of the commit log are not deleted after consumption. This has two advantages. One is that the contents of the commit log can be re-consumed by multiple consumer groups. Each consumer group maintains its own offset; The other is support for message backtracking, which can be searched at any time.

If files are not cleared, however, the number of files keeps growing and eventually running out of disk space, so RocketMQ will delete the commitLog and Consume queue files, which are over 72 hours old by default. Two threads will be started to run.

    private void cleanFilesPeriodically(a) {
        this.cleanCommitLogService.run();
        this.cleanConsumeQueueService.run();
    }
Copy the code

After the expired files are selected, when do you clean them? There are two situations. One is to do a scheduled task at 4 a.m. every day to delete the files. The second one is that the disk usage exceeds 75%, and it’s already burning the roof over its head. Why should I wait for you at 4:00? It’s cleaned up right away.

If the situation is more serious, if the disk space usage exceeds 85%, a batch of files will be cleaned, whether expired or not, until the space is sufficient. If the disk usage exceeds 90%, the message is rejected.

Zero copy

Everyone knows that RocketMQ messages are stored on disk, but one of the secrets to achieving such low latency and high throughput is the use of zero-copy technology.

A page Cache is an operating system level Cache. To read or manipulate data from a disk, the CPU must load data from the disk into memory. This load size has a fixed unit called page. A standard page size for Linux on x86 is 4KB. If you want to increase disk access speed or reduce disk I/O, you can Cache accessed pages in memory. This area of memory is called Page Cache.

The next time you process an I/O request, check the Page Cache first. If the Page Cache finds a request, perform operations directly. If the Page Cache does not find an I/O request, check the disk again. The Page Cache also prereads data. For the first read request, the system also reads the next several pages of the requested Page together. The Page Cache belongs to the kernel space, and the user space cannot access it. The Page Cache needs to be copied from the kernel space to the user space buffer. This copying process slows down the data access speed.

To solve this problem, zero-copy technology was developed,The Page Cache is mapped in user space so that users can read and write to the Page Cache through Pointers, eliminating the need for system calls (such as read()) and memory copies. The implementation in RocketMQ uses MMAP (memory map), whereas Kafka uses SendFile.

consumers

Load balancing and rebalance on the consumer side

As with Kafka, the Message Queue is load-balanced on the consumer side so that each consumer can reasonably consume messages from multiple partitions. Consumers die, and consumers grow. That’s when we use our rebalance.

Rebalance on line 277 of the rebalanceImp. class

      AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

      List<MessageQueue> allocateResult = null;
      try {
               allocateResult = strategy.allocate(this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
           } catch (Throwable e) {
                log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),e);
                return;
           }
Copy the code

AllocateMessageQueueStrategy has 6 kinds of implementation strategy, also can be customized, can be specified at the consumer end.

consumer.setAllocateMessageQueueStrategy();
Copy the code
  • AllocateMessageQueueAveragely: the average allocation algorithm (the default)

  • AllocateMessageQueueAveragelyByCircle: the circular distribution of the message queue

  • AllocateMessageQueueByConfig: according to the configuration to assign the queue, according to the user to specify the configuration of the load
  • AllocateMessageQueueByMachineRoom: according to the specified machine room to configure a queue
  • AllocateMachineRoomNearby: according to the room to the nearest to configure the queue
  • Hash AllocateMessageQueueConsistentHash: consistency, according to consumers’ cid

The number of queues should be larger than the number of consumers.

Retry with dead letter queue

RECONSUME_LATER is returned to the Broker if an exception occurs on the consumer side, such as database unavailability, network problems, power outages, etc. At this point the message is sent back to the Broker and into the Retry queue of RocketMQ. The server creates a RETRY queue for the Consumer Group with a name starting with %RETRY%.

The retry queue is reposted to the ConsumerGroup after a period of time, and if it still fails, it is reentered into the retry queue. The retry interval decreases from 10 seconds to 2 hours. 10s 30s 1m 2m 3m 4M 5M 6m 7m 8m 9m 10m 20m 30m 1H 2h The maximum number of retries is 16.

If repeated purchases continue to fail a certain number of times (16 by default), they will be sent to the DLQ dead-letter queue. The Broker creates a dead-letter queue with the name %DLQ%+ConsumerGroupName. The application can monitor the dead-letter queue for manual intervention. Under normal circumstances, we do not need to retry 16 times in actual production, which wastes both time and performance. In theory, if the consumption fails after the repeated attempts reach the desired result, we need to record the corresponding message and end the repeated attempts.

Source in jackxu/SimpleConsumer. Java

Selection analysis of MQ

Here is a comparison of the three common MQ types available in the market for actual use in your projects:Well, that’s the end of RocketMQ, thanks for watching