I have been studying RocketMQ recently. I have written some notes in my spare time.

RocketMQ has many design points of interest: message sending, message consumption, routing center NameServer, message filtering, message storage, master-slave synchronization, transactional messages, and more.

This article does not require you to have a pre-requisite for Using RocketMQ, but analyzes the RocketMQ Store package entirely from a direct implementation of the message Store.

0. Queue file storage thinking

Before we start, let’s do a quick thought.

Is it feasible for MQ to receive a large number of messages if they are all in memory?

Of course not with the limitations of machine memory, so consider non-memory storage.

Database?

It sounds weird, but ActiveMQ is a real thing.

Local disk?

This is too slow, and if I had stored messages that I could read sequentially, it would have been much slower if it involved various message features.

The local disk is slow, but it has a lot of capacity.

The memory capacity is small, but it is fast.

So, can there be a compromise, which is to use memory and local disk.

I came up with a fill and swap algorithm that maps a page file of a fixed size (say 128M) into memory as needed and unmaps it if the file is not accessed for a fixed TTL. With this design, I can not only use memory more safely and efficiently, but also delete some used page files when needed to save disk space.

MQ is, as its name implies, a message queue.

Queue is a pre – read, post – append structure, so you only need to put the front part and the back part of the queue into the memory, the middle part of the disk operation, to ensure efficient and large – capacity operation.

Read and append operations can always occur in memory, which means that enqueue and unqueue operations are always close to O(1) access speed.

However, if we want to read the data on disk, the speed will still slow down, so we can maintain another index to record the offset of the target message on the disk file, accessed by random read interface.

At this point, a very practical queued file storage system is in sight.

1.RocketMQ file storage architecture

RocketMQ stores messages to local files so that our messages are guaranteed to be searchable and recoverable in the event of a system failure.

Unlike Kafka, which stores messages on partitions, RocketMQ stores messages to commitlogs without any distinction between Topic, Group, etc.

The default path is the Store directory in the user space of drive C.

Each CommitLog has a fixed size.

All producer messages are written to CommitLog files, and since they are not fast to read on disk, an index-like file is also required.

This is ConsumeQueue, which is the logical queue.

The CommitLog Offset stored in the ConsumeQueue is the message’s Offset position in the CommitLog, which is the coordinate, guaranteed to be quickly located using random reads.

There is also the Size of the message, which is how many bytes I need to read from the CommitLog location.

Finally, the hash of the Message Tag, Message Tag Hashcode, is used to filter messages.

CommitLog is written directly to a local file after MQ receives a message, whereas ConsumeQueue, a commitlog-like structure built asynchronously, is a logical queue structure.

The directory structure of ConsumeQueue is interesting if you look at the local storage file.

It is stored in a hierarchy with Topic as the main directory and queue ID as the sub-directory.

The size of each message in the ConsumeQueue is 20 bytes according to our calculation in the above figure.

CommitLog Offset : 8 Byte Size : 4 Byte Message Tag Hashcode : 8 Byte

There is some additional data on the ConsumeQueue, which is where consumers consume messages.

Together, we can get a storage architecture diagram of RocketMQ.

2. Source code analysis

With the concepts of CommitLog and ConsumeQueue, we can simply go through the message storage process code.

RocketMQ file storage code can run independently without starting NameServer, Broker and other nodes. For details, see the Test case in the RocketMQ source Store package.

The first step is to instantiate the Store configuration.

Specifies the CommitLog and ConsumeQueue file sizes, whether the flush mechanism is synchronous or asynchronous, and the asynchronous build interval for ConsumeQueue.

Here MessageStore instance is the core class of RocketMQ message storage system, DefaultMessageStore, whose putMessage method is the entry method for writing messages to disk files.

After instantiating DefaultMessageStore, you can build a simple message to write using the putMessage method.

Use the putMessage.

At this time into the org. Apache. Rocketmq. Store. DefaultMessageStore# putMessage.

There are six pre-checks here.

  1. Verify the initialization status of DefaultMessageStore instance. When you start messageStore in the previous step, initialization of the CommitLog file channel will be completed. Therefore, verify whether the initialization is complete.
  2. Whether the current node is a slave node, only the master node stores messages.
  3. Could you write
  4. Topic length is too long
  5. Check whether too many attributes are configured
  6. Whether the operating system paging cache is busy

After passing the above six steps, you can store the file.

Here is the rest of the DefaultMessageStore#putMessage process, which looks simple enough.

The elapsed time is recorded before and after calling the putMessage of the CommitLog. If the elapsed time exceeds 500 milliseconds, an alarm log is sent. Then a series of indicator monitoring is performed by StoreStatusService. StoreStatusService records all Store operations and is used by some RocketMQ console applications to display node status and other information.

Then we enter org.apache.rocketmq.store.Com mitLog# putMessage.

In the beginning, a timestamp is written into the message, and a checksum code is calculated based on the message content to prevent the message from being tampered with.

It then determines whether the current message is a transactional message, and if it is a transactional message, it determines whether it is a delayed message.

This brings us to the core class for the RocketMQ message store, MappedFile.

MappedFile is the key to implementing random reads. It creates a mapping between memory and files, and each MappedFile points to a CommitLog. As long as I know the message location (stored in ConsumeQueue), We can quickly use MappedFile to set the offset position to read the message. When we need to write, the MappedFile also stores CommitLog records of the current and written place. We only need to set the offset position to that place and write the message.

In general, it combines sequential writing and random reading to ensure efficient O(1) reading and writing.

Of course we have more than one CommitLog, so we also have more than one instance of MappedFile. For this reason, we have an MappedFileQueue to store it.

Because I haven’t used a second CommitLog at all, there is only one MappedFile in the Queue, and this MappedFile also points to a CommitLog.

When writing a message to a CommitLog, we need to take the last CommitLog, because writing only adds messages to the last CommitLog.

So here we need to call this. MappedFileQueue. GetLastMappedFile () to obtain the last MappedFile.

With MappedFile, we need to start the actual operation on the disk file. This time, we need to lock the disk file. The official comment says that this is either spin lock or reentrant lock, of course, depending on the situation, if you have a powerful CPU machine, of course, recommend spin lock.

In a try-catch, you set the timestamp of the message’s storage again (remember that was set before) and determine whether the mappedFile you got is null or full. If so, as shown in the following figure, You need to create a new CommitLog.

If it is still null after creation, write failure is returned.

The MappedFile appendMessage method is then called to write a message to the disk file.

The returned result will also be checked with status, and different returned results will be wrapped according to different status.

Before entering MappedFile#appendMessage, let’s go through the CommitLog putMessage method and see what happens after a successful write.

Shock!!! It is master-slave synchronization and brush disk operation! I’m not going to go into the code here, but to get a sense of the concept.

In the master/slave synchronization, the node will synchronize the contents of its CommitLog to other nodes. As can be seen here, it will synchronize every new message, which is real-time.

The MappedFile will be flushed to the local disk only once. Whether it is flushed synchronously or asynchronously, and how long it is flushed asynchronously, depends on your store configuration (remember the initial instance of DefaultMessageStore).

After that, go to MappedFile#appendMessage.

If the file size is larger than the CommitLog file size, an error will be reported. If the file size is smaller than the CommitLog file size, the next steps will be started.

First get the MappedFile ByteBuffer for CommitLog. This is not easy.

Next, set the location where ByteBuffer will operate.

There are two ways to do this, one is single message write and one is batch, so let’s look at doAppend for single message write.

DoAppend passes in fileFromOffset, which you can assume is the name of the CommitLog file, because the CommitLog file name itself is an offset stored in multiple files, ByteBuffer, and a file size minus the value of the current offset, This can be interpreted as how much more the current CommitLog can write, and the last parameter is the message content.

cb.doAppend(this.getFileFromOffset(),
		byteBuffer,
		this.fileSize - currentPos,
		(MessageExtBrokerInner) messageExt);
Copy the code

The doAppend method is on the CommitLog, which is the last function in the source code analysis of message writing. In this method, serialization of messages is performed and buffer writing is performed.

First, calculate the actual table position of this message based on the offset of the buffer and the offset of the CommitLog file. This value will only appear in the ConsumeQueue, because the ConsumeQueue stores the offset relative to all commitlogs. Instead of an offset in a single CommitLog file, add the offset location of the CommitLog itself.

The message ID is then created from MessageDecoder. This ID is stored in memory and is a Table of Topic and queue ids under Topic.

Concrete implementation.

This is followed by a large section of configuring serialization of the message.

After serialization is complete, we have the length of the message, so we know how much space this message takes up in the CommitLog, so we need to determine if it’s full.

The logic of if here can be understood to mean that the size of the message plus the CommitLog file terminator needs to be less than the remaining space of the CommitLog file.

This file terminator is actually a magic number found everywhere in UNIX system files, as well as in operating system booting, and can be understood as a special number to separate space on disk.

  // File at the end of the minimum fixed length empty
        private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
Copy the code

Return an error if the size is insufficient.

If there is enough space, start writing messages. Refer to the CommitLog Message specification below.

In contrast to the figure above, the message writing specification fits perfectly.

Finally, the message is put into buffer and success is returned

To sum up, the message is written, but the message is only written to Buffer and will be persisted to disk after a subsequent flush.