Writing time: 07:50 on July 30, 2020

The RocketMQ code for this article is based on the latest source code: RocketMq-all-4.7.1.

I use RocketMQ a lot in my work. I know how to use it but I don’t know how it works, and sometimes I don’t know where to start. So I recently took a look at the source code for RocketMQ, learned about its system design, and became comfortable using it.

After reading this article, you will learn about RockeMQ’s architecture and solve RocketMQ’s following questions.

  • Why do RocketMQ consumers try again after failing to consume?
  • How does RocketMQ delay queue work?
  • How does RocketMQ send A message to both external SYSTEMS A and B?
  • How does the Intranet environment prevent other colleagues’ machines from stealing your MQ messages?
  • How is RocketMQ message progress saved?

This is my RocketMQ system design. I was going to use Excel to draw one, but IF I don’t like the trouble to draw one, I will use Apple pencil to draw one.

Overall message sending process

If you just want to understand RocketQM’s message sending and consumption process, read this section. The following text describes the flow of the diagram above.

RocketMQ has four conceptual models. When RocketMQ middleware is started, NameServer is started first. NameServer maintains the Broker’s information, receives the Broker’s heartbeat, and sends the heartbeat to NameServer every 30 seconds. NameServer scans the live Broker every 10 seconds. If it does not receive a heartbeat for more than 120s, it considers the Broker down and removes the message from it.

Broker A message Broker server that stores and forwards messages. The Broker sends a heartbeat to the NameServer every 30 seconds when it starts. And receives requests from producers and consumers. Messages are stored in the Broker, which writes them to a commitlog directory and to a specified queue. Queues are organized in the Broker as directories named for topics under the ConsumerQueue directory. Each queue is named with numbers 0, 1, 2, and 3. After writing to the Commitlog, the offset of the message is forwarded to the ConsumerQueue so that the consumer can pull the message from the queue.

The producer updates the Topic routing information to the NameServer every 30 seconds during startup. When a producer sends a message, it looks in the local cache for the Broker to which topic it wants to send, and asks NameServer to find it if it cannot find it. After finding a Topic route and several queues under a Topic, the producer selects a queue under that Topic and sends a message to that queue. There are also fault avoidance algorithms, which are not covered here.

RocketMQ consumers get messages in push and pull modes. Push mode is essentially a client-wrapped pull mode, and RocketMQ essentially has only one pull mode. The consumer will select the queue that the consumer needs to consume according to its own load balancing algorithm. The Broker is asked to get information about this queue, which defaults to 32 messages. After the message is pulled down, it is committed to the thread pool for consumption, with 10 threads by default. If the consumption fails, the message is sent back to the Broker failure retry mechanism and advances progress in memory. If successful, the progress is directly advanced, and the consumer sends the progress to the Broker every 10 seconds.

If a consumption fails, the Broker posts a message to a delayed queue, so RocketMQ’s failure-retry mechanism is based on delayed queues. RocketMQ only supports delay queues with specific delay levels and does not support custom delay times. Each delay queue has a timer every 100ms to check whether the message is out of time. If yes, the message is sent to the corresponding queue.

RocketMQ model concept

As shown above, RocketMQ involves four main roles:

  • NameServer: A naming server that updates and discovers Broker services
  • Broker: A role that stores and forwards messages
  • -Serena: Well, I’m not being a Producer.
  • -Penny: You’re a Consumer.

RocketMQ routing center NameServer

NameServer provides service discovery and registration functions. NameServer is a stateless node that can be deployed in a cluster.

The NameServer startup class is NamesrvStartup, which scans a live broker every 10 seconds (non-live brokers are removed) and starts Netty to process network requests

NameServer Route registration and fault elimination

NameServer uses four maps to maintain Topic routing information.

Their runtime structure is as follows:

  • TopicQueueTable: Topic queue table that records how many queues exist under each Topic and on which Broker message consumption is queue-based.
  • BrokerAddrTable: Broker address table that records which cluster the Broker is in and the primary and secondary addresses of the Broker.
  • ClusterAddrTable: A cluster information table that records all brokers in a cluster
  • BrokerLiveTable: Table of broker survival information that records the last heartbeat of the broker. If it exceeds 120s, the broker is down and removed

Broker a heartbeat

The broker sends heartbeat every 10 seconds.

Routing discovery

RockeMQ route discovery is non-real-time. When the Topic route changes, NameServer will not actively push it to the client (producer and consumer), but the client will periodically pull the latest route of the Topic.

RocketMQ message sending

RocketMQ supports three message sending modes:

  • synchronous
  • asynchronous
  • A one-way

The two main issues are:

  • How is the message queue loaded?
  • How can message sending be highly available?

Producer to start the

When Producer starts, it obtains an MQClientInstance client that it uses to interact with the server.

There is only one MQClientInstance in the same JVM, regardless of how many producers or consumers there are. Routing information is maintained by MQClientInstance, so producers and consumers use the same route.

Find the subject routing information

If no Topic route exists in the local cache, it is pulled to NameServer in real time. If automatic creation Topic is configured, it will be automatically created.

The Topic exists on the Broker as a directory, and the queue MessageQueue is the stored entity. Messages are sent and received based on queues.

Select message queue

Select a queue and send messages to the queue. The default load balancing mode is queue number +1 each time

If failover is enabled and sending to a Broker fails, it will be avoided for a certain amount of time. So the high availability approach is retry and evade.

RocketMQ message store

The RocketMQ message store consists of three main files:

  • Commitlog: All messages sent will go to this file
  • Consumerqueue: This directory is a collection of directories named for topics, each of which has multiple queue directories, each of which has multiple files. Messages are asynchronously forwarded to this directory after they are commitlog. Messages are consumed primarily in this directory
  • Index: Provides an index file to quickly query messages with tags

Only the offset of the message is checked in the ConsumerQueue and index. After the offset is checked, the entire message is checked in the commitlog. Mysql > select primary key from B+ tree; select primary key from B+ tree; Storage files are designed in such a way that commitlogs are written sequentially, which improves efficiency.

Commitlog files are 1 gb each, using the system call Mmap () memory-mapped file technique.

figure

Message ids (msgId) are generated as follows, so you can use msgId to quickly find messages from commitlogs.

RocketMQ related documents

The consumeroffset. json file stores the progress of the cluster consumption pattern message consumption as a consumption group. If you delete this file, you can set the consumption group to be reconsumed by default from the start or end of the Commitlog.

The structure stored in the ConsumerQueue message queue file is as follows, with the last Tagcode acting as a filter when the consumer pulls the message.

When a message is stored in the ConsumerQueue queue, if long polling is enabled, the message is returned to the consumer in real time.

Expired files are deleted periodically

The default value is 3 days

RocketMQ message consumption

Message consumption is carried out in group mode. A consumer group can contain multiple consumers, and each consumer group can subscribe to multiple topics. There are two consumption modes between consumer groups: cluster mode and broadcast mode. In cluster mode, the same message under a topic is allowed to be consumed by only one of the consumers. Broadcast mode, in which the same message under a topic is consumed by all consumers in the cluster. There are also two modes of messaging between the messaging server and the consumer: push mode and pull mode. The so-called pull mode is that the consumer initiates the pull message request, while the push mode is that the message reaches the message server and is pushed to the message consumer. The Implementation of the RocketMQ message push pattern is based on the pull pattern, where a layer is wrapped on top of the pull pattern and the next pull task is started after one pull task is completed.

Consumer initiation

Message consumers start up by subscribing to the specified topic and retry topic.

Loaded by the RebalanceImpl, the corresponding queues under the topic are assigned to different consumer pulls.

Check all queues for load allocation. There are many load balancing algorithms. The load is executed periodically, and if a new consumer is added to the load balance, the consumer is added to the load balance. If the number of consumers is greater than the number of queues, the extra consumers will not participate in message consumption.

Pull the message

After the load is allocated, each consumer is assigned to the specified queue. The message is pulled and submitted to the thread pool for consumption. By default, 32 messages are pulled each time. In order to improve efficiency, message pull adopts long polling mechanism.

Consumption patterns include concurrent consumption and sequential consumption.

Start consuming the message and executing the business logic. Default is 10 threads, maximum is 20 threads.

The consumption fails, sends the message back to the Broker, fails and retry. We will promote consumption.

Set delayLevel on consumption failure

Timing message mechanism

Each timed message queue has a timer. The timer traverses the timed message queue and writes the message to the commitlog as a topic. After an interval of 100ms, the timer rescan the timing queue.

Timing message graph:

Message retry mechanism

DelayLevel is also set when message consumption fails. Therefore, the message retry mechanism is based on the scheduled message. DelayLevel is incremented by one each time, and if more than 16 attempts are failed, it will not be retried and is posted to the dead-letter queue.

The server handles the message pull

The server will first filter it with tagcode.