Chapter 3 How RocketMQ works

One, the production of messages

1 Message production process

Producer can write messages to a Queue within a Broker as follows:

  • Before sending a message, Producer sends a request to NameServer to obtain routing information for the message Topic

  • NameServer returns the routing table and Broker list for this Topic

  • Producer selects a Queue from the Queue list based on the Queue selection policy specified in the code, which is used to store messages later

  • Produer does some special processing on the message, for example, if the message itself is larger than 4M, it compresses it

  • Producer sends RPC requests to the Broker of the selected Queue, sending messages to the selected Queue

    Routing table: actually a Map with key as Topic name and value as a list of QueueData instances. QueueData is not a Queue corresponding to a QueueData, but rather all queues in a Topic within a Broker corresponding to a QueueData. That is, each Broker corresponds to a QueueData whenever the Broker for that Topic is involved. BrokerName is contained in QueueData. In simple terms, the routing table key is the Topic name and the value is a list of all BrokerNames that are involved in that Topic.

    A list of brokers: This is actually a Map. BrokerName for key and BrokerData for value. One Broker for one BrokerData instance, right? Not right. A small master-slave cluster with the same brokerName corresponds to one BrokerData. BrokerData contains a brokerName and a map. The map key is brokerId and the value is the address of the broker. BrokerId 0 means the broker is Master, non-0 means Slave.

2 Queue selection algorithm

For unordered messages, the Queue selection algorithm, also known as message delivery algorithm, has two common types:

Polling algorithm

Default selection algorithm. This algorithm ensures that messages are evenly received in each Queue.

There is a problem with this algorithm: Queues on some brokers may be delayed for some reason. As a result, there is a large backlog of messages in Producer’s cache queue, which affects message delivery performance.

Minimum delivery delay algorithm

The algorithm calculates the time delay of each message delivery and then sends the message to the Queue with the smallest time delay. If the delay is the same, the polling algorithm is used for delivery. This algorithm can effectively improve message delivery performance.

There is also a problem with this algorithm: messages are unevenly distributed on the Queue. Queues with low delivery latency may have a large number of messages. Consumer pressure on this Queue increases, reducing the ability to consume messages and potentially causing messages to pile up in MQ.

Second, message storage

Messages in RocketMQ are stored on the local file system, and these related files default to the Store directory under the current user’s home directory.

  • Abort: This file is created automatically after the Broker is started. It is closed normally and the file disappears. If this file exists without starting the Broker, the previous shutdown of the Broker was abnormal.
  • Checkpoint: stores the last flush time of commitlog, consumeQueue, and index files
  • Commitlog: It stores the Commitlog file in which messages are written
  • Config: Stores configuration data for the Broker as it runs
  • Consumequeue: This is where the consumeQueue file is stored. The queue is stored in this directory
  • Index: Stores the message index file indexFile
  • Lock: Global resource lock used during runtime

1 commitlog file

Note: In many materials, files in the Commitlog directory are simply called commitlog files. But in the source code, the file is named mappedFile.

Directories and Files

There are many Mappedfiles in the Commitlog directory, and all messages in the current Broker are sent to these Mappedfiles. The size of the mappedFile file is 1 gb (less than or equal to 1 GB). The file name is a 20-digit decimal number that indicates the start offset of the first message in the current file.

The first filename must be a 20-bit 0. Because the first message of the first file has a commitlog offset of 0

When the first file is full, a second file is automatically generated to continue the message. Assume that the size of the first file is 1073741820 bytes (1G = 1073741824 bytes), the second file name is 00000000001073741824.

Similarly, the NTH file name should be the sum of the previous n-1 file sizes.

Commitlog offsets for all Mappedfiles in a Broker are consecutive

Note that a Broker contains only one commitlog directory, where all mappedfiles are stored. That is, no matter how many Topic messages are currently stored in the Broker, they are sequentially written to the mappedFile file. That is, the messages are stored in the Broker without being sorted by Topic.

The mappedFile file is read and written sequentially, so it is accessed efficiently

Whether SSD or SATA disks, sequential access is usually more efficient than random access.

Message unit

The mappedFile file consists of message units. Each message unit contains the total length of the message MsgLen, physical location of the message physicalOffset, message Body content Body length BodyLength, message Topic, Topic length TopicLength, message producer BornHost and message sending timestamp B There are more than 20 message-related attributes, such as ornTimestamp, Queue QueueId, and Queue QueueOffset.

Note that the message unit contains attributes related to Queue. Therefore, we need to pay attention to the relationship between commitlog and queue in the future.

Commitlog offset offset of the M +1 message unit in a mappedFile file

L(m+1) = L(m) + MsgLen(m) (m >= 0)

2 consumequeue

Directories and Files

For efficiency, a directory named Topic name is created for each Topic in ~/store/ consumeQueue. In the Topic directory, a second directory named queueId is created for each Topic Queue. Each directory stores several ConsumeQueue files, which are commitlog index files that can be used to locate specific messages.

The consumeQueue file name is also composed of 20 digits representing the starting offset of the first index entry of the current file. Unlike the mappedFile file name, the subsequent file name is fixed. Because the consumeQueue file size is fixed.

Index entries

Each ConsumeQueue file can contain up to 30W index entries. Each index entry contains three important message attributes: the message’s Offset in the mappedFile file, the message length, and the hashcode value of the message Tag. These three attributes take up 20 bytes, so the size of each file is a fixed 30W by 20 bytes.

All messages in a consumeQueue file must have the same Topic. But the Tag for each message may be different.

3 Read and write files

News writing

A message entering the Broker undergoes several processes before it is persisted:

  • The Broker obtains QueueOffset from the consumeQueue directory for the corresponding index entry of the message
  • Encapsulate data such as queueId and queueOffset with messages as message units
  • Write message units to commitlog
  • At the same time, a message index entry is formed
  • Distribute message index entries to the corresponding ConsumeQueue

Message pull

When a Consumer pulls a message, it goes through the following steps:

  • The Consumer gets the consumption offset of the Queue from which it wants to consume the message, and calculates the message offset for the message it wants to consume

    The consumption offset is the consumption progress. The consumption offset of a Queue by a consumer is the number of messages consumed in the Queue

    Message offset = consumption offset + 1

  • The Consumer sends a pull request to the Broker, which contains the Queue, message offset, and message Tag for the message it wants to pull.

  • The Broker calculates the queueOffset in the consumeQueue.

    QueueOffset = Message offset *20 bytes

  • The first index entry with a specified Tag is looked back from the queueOffset.

  • Parsing the first 8 bytes of the index entry locates the message’s Commitlog offset in the COMMITlog

  • The message unit is read from the corresponding Commitlog offset and sent to the Consumer

Performance improvement

In RocketMQ, both the message itself and the message index are stored on disk. Doesn’t it affect the consumption of messages? Of course not. RocketMQ’s performance is one of the highest in current MQ products. Because the system greatly improves performance through a series of related mechanisms.

First, RocketMQ reads and writes files using mMAP zero-copy, which translates operations on files into direct operations on memory addresses, greatly improving file read and write efficiency.

Secondly, the data in ConsumeQueue is stored sequentially, and the PageCache prefetch mechanism is introduced, which makes the reading of Consumequeue file almost close to the memory reading, even in the case of message accumulation will not affect the performance.

The PageCache mechanism is a file caching mechanism used by the OS to speed up read and write operations on files. Generally speaking, the sequential read and write speed of the program is almost as fast as the memory read and write speed. The main reason is that the OS uses PageCache mechanism to optimize the performance of the read and write operation and uses part of the memory for PageCache.

  • Write operation: The OS writes data to the PageCache and then asynchronously flushes the data from the Cache to physical disks by the PDFlush (Page dirty Flush) kernel thread
  • Read operation: If a user reads data from the PageCache, the user first reads the data from the PageCache. If the data is not matched, the OS loads the data from the physical disk to the PageCache and performs data in adjacent data blocks in sequenceProofread the take.

One area in RocketMQ that may affect performance is the commitlog file read. For commitlog files, there is a lot of random access to read messages, which can seriously affect performance. However, if you select an appropriate system I/O scheduling algorithm, such as Deadline (using SSDS), random read performance will also improve.

Comparison with Kafka

Much of RocketMQ comes from Kafka, where commitlogs and ConsumeQueues are commitlogs.

The commitlog directory in RocketMQ combined with consumeQueue is similar to a partition directory in Kafka. A mappedFile file is similar to a segment in Kafka.

Messages for a Topic in Kafka are split into one or more partitions. A partition is a physical concept that corresponds to one or more directories in a topic directory. The files contained in each partition, called segments, are the files that store messages.

Kafka stores messages in topic (partition) and segment (segment) directories

There is no concept of a Tag in Kafka

Index files are not required in Kafka. Since the producer writes the message directly to the partition, the consumer reads the data directly from the partition

Third, indexFile

In addition to the usual Topic row message consumption, RocketMQ provides the ability to query messages by key. This query is a quick query indexed by indexFile in the index subdirectory of the Store directory. Of course, the indexed data in this indexFile is written when the message containing the key is sent to the Broker. If the message does not contain a key, it will not be written.

1 Index entry structure

Each Broker contains a set of IndexFiles, and each indexFile is named after a timestamp (the timestamp at which the indexFile was created). Each indexFile consists of three parts: indexHeader, slots, and indexes. Each indexFile contains 500w slot slots. Many index units may be mounted to each slot.

IndexHeader is a fixed 40 bytes containing the following data:

  • BeginTimestamp: Storage time of the first message in the indexFile
  • EndTimestamp: Time when the last message in the indexFile was stored
  • BeginPhyoffset: The offset of the first message in the indexFile in the Commitlog offset
  • EndPhyoffset: The offset of the last message in the indexFile in the Commitlog offset
  • HashSlotCount: The number of slots that have been filled with index (not every slot has index units attached to it, but the number of slots that have index units attached to it)
  • IndexCount: The number of index units in the indexFile (the total number of index units mounted to all slots in the current indexFile)

The most complex aspect of indexFile is the relationship between Slots and Indexes. In actual storage, the Indexes come after Slots, but for ease of understanding, their relationship is displayed as follows:

The hash value % 500W for the key is the slot value. The slot value is then changed to the indexNo of the index unit, from which the position of the index unit in the indexFile can be calculated. In order to solve this problem, preIndexNo was added to each index unit to specify the index unit that preceded the current index unit in the slot. A slot always stores the latest index unit indexNo. If the slot is found, the latest index unit can be found, and all previous index units can be found through this index unit.

IndexNo is a sequence number in an indexFile, ascending from 0. That is, all IndexNos in an indexFile are incremented by this. IndexNo is not represented in the index unit; instead, it is computed by indexes.

Index The index unit contains 20 bytes, which contains the following four attributes:

  • KeyHash: Hash value of the business key specified in the message
  • PhyOffset: The offset of the message corresponding to the current key in the Commitlog offset
  • TimeDiff: Indicates the difference between the time when the message corresponding to the current key is stored and the time when the current indexFile is created
  • PreIndexNo: Indicates the indexNo of the index unit that preceded the current index unit in the current slot

2 Creating an indexFile

The file name of indexFile is the timestamp when the current file was created. What’s the use of this timestamp?

When you query based on a service key, you must specify a timestamp in addition to the key. That is, you need to query the latest messages stored before the specified timestamp. This timestamp file name can simplify and improve query efficiency. More on that later.

When was the indexFile file created? There are two conditions (timing) for its creation:

  • When the first message with a key is sent and no indexFile is found, the system creates the first indexFile

  • When the number of index units mounted to an indexFile exceeds 2000W, a new indexFile is created. When a message with a key is sent, the system finds the latest indexFile and reads the indexCount from the last 4 bytes of its indexHeader. If indexCount >= 2000W, a new indexFile is created.

    As can be inferred, the maximum size of an indexFile is :(40 + 500w * 4 + 2000w * 20) bytes

3 Query Process

When consumers query corresponding messages through business keys, they need to go through a relatively complex query process. However, before analyzing the query flow, it is important to know a few positioning calculations:

Compute the slot number of the specified message key: Slot slot number = Hash % 500W of the key (formula 1)Copy the code
Calculate the starting position of slot n in indexFile: slot(n) = 40 + (n-1) * 4 (formula 2)Copy the code
Index (m) = 40 + 500w * 4 + (m-1) * 20 (formula 3)Copy the code

40 is the number of indexHeader bytes in indexFile

500W * 4 is the number of bytes of all slots

The specific query process is as follows: