RocketMQ is a disk-based middleware with unlimited backlog capability and the ability to provide high throughput, low latency services, but the core of RocketMQ is certainly its elegant storage design.

Warm tip: This article is excerpted from the second edition of RocketMQ Technology Insider. One of the biggest changes is to extract the core working mechanism of RocketMQ through graphics before entering the source code analysis, reducing the difficulty of reading the source code and provoking thinking.

1. Storage Overview

RocketMQ stores Commitlog files, ConsumeQueue files, and Index files.

RocketMQ stores messages for all topics in the same file, ensuring that messages are written sequentially as they are sent, ensuring maximum availability and throughput for message delivery.

However, messaging middleware generally adopts a topic-based subscription and publish mode, and messages must be selected by topic when consuming messages. It is obviously inefficient to filter messages by topic from Commitlog files. To improve the efficiency of retrieving messages by topic, RocketMQ introduces the ConsumeQueue file. Custom becomes a consumption queue file.

A relational database can retrieve records based on field attributes. RocketMQ, a business-oriented messaging middleware, also provides message attributes-based retrieval. The underlying design concept is to create hash indexes for Commitlog files and store them in Index files.

After writing to the Commitlog file in RocketMQ, the ConsumeQueue and Index files are built asynchronously. The data flow diagram is as follows:

2. Storage file organization

RocketMQ strives for extreme sequential disk writes during message writing. Messages for all topics are written to a single file, Commitlog. All messages are appended to the file in the order they arrive. Once written, messages cannot be modified. The Commitlog file layout is shown below:One big difference between file-based programming and memory-based programming is that in the memory-based programming mode, we have ready-made data structures, such as List and HashMap. It is very convenient to read and write data. Then, how to search for messages after they are stored in the Commitlog?

Just as relational data introduces an ID field for each piece of data, in the file-based programming model, there is also an identity marker for a message: the message physical offset, where the message is stored at the start of the file.

With physical offsets in mind, Commitlog file names are technically named using the offset of the first message stored in the entire Commitlog file group. For example, the first Commitlog file is 0000000000000000000. The second file is 00000000001073741824, and so on.

The advantage of this is that given the physical offset of any message, for example, the message offset is 73741824, you can search through dichotomy to quickly locate the file in the first file, and then subtract the name of the file from the physical offset of the message to get the difference, which is the absolute address in the file.

Commitlog files are designed for extreme message writing, but we know that the message consumption model is a topic-based subscription mechanism, where a group of consumers consumes messages for a specific topic. If we were to retrieve messages from commitlog files by topic, this would be a bad idea. We could only retrieve messages from the first message in the file one by one, which is pretty good. To solve the topic-based message retrieval problem, RocketMQ introduced the consumeQueue file. The structure of consumeQueue is shown below.The ConsumeQueue file is a message consumption queue file, which is a Commitlog fileTopic-based index files, is mainly used by consumers to consume messages according to Topic, which is organized as/Topic /queue, with multiple files in the same queue.

The Consumequeue is cleverly designed with each entry fixed in length (8 bytes commitlog physical offset, 4 bytes message length, 8 bytes tag Hashcode).

Instead of storing a raw string of tags, you have chosen to store HashCode, which ensures that each item is of a fixed length. You can quickly locate the item by accessing an array-like subscript, greatly improving the read performance of the ConsumeQueue file.

Imagine that the message consumer has the number of Consumeque entries according to topic, message consumption progress (ConsumeuQE logical offset), This consumption progress accesses the message by using the logical offset logicOffset * 20 to find the starting offset of the item (the offset in the consumeQueue file) and then reading the next 20 bytes from that offset to get an entry without traversing the ConsumeQueue file.

RocketMQ has a great advantage over Kafka in that it allows messages to be retrieved by message attributes. The consumeQueue file is introduced to solve the topic-based lookup problem, but it does not allow you to find messages based on a particular attribute of the message.

RocketMQ introduced the Index Index file, implementationFile – based hash index. The file storage structure of IndexFile is shown below:IndexFile Implements Hash indexes based on physical disk files. The file consists of a 40-byte header, 5 million hash slots of 4 bytes each, and finally 20 million Index entries of 20 bytes each, The hashcode for the 4-byte Index key, the 8-byte physical offset of the message, the 4-byte timestamp, and the 4-byte previous Index entry (hash conflicted linked list structure).

That is, the mapping relationship between the hashcode of the index Key and the physical offset is established, and the commitlog file is quickly defined based on the Key. For details on the working mechanism of Hash indexes, refer to Section 4.5.3 of RocketMQ Technology Inside edition 2.

3. Write in order

Another design principle to improve disk write performance based on disk read and write is sequential disk write.

Disk sequential writes are widely used in file-based storage models. Consider the purpose of MySQL InnoDB’s storage engine for Redo logs. We know that there is a memory Pool used to cache disk file blocks. Changes are made in memory first, then changes are written to redo files (flush to disk), and data from the InnoDB memory pool is periodically flushed to disk.Why not just update the data to the specified data file instead of changing it? With MySQL InnoDB a inventory in thousands of pieces, each piece of data will use a separate file storage, if each table data is changed, just flash to disk, will be the presence of large amounts of random writes, performance cannot be improved, so introduce a redo files, sequential write redo files, on the face of the step to brush set operation, However, since the data is written sequentially, the performance improvement is significant compared with random data.

4. Memory mapping

While sequential disk-based writes can greatly improve I/O write efficiency, the performance gains are limited if file-based storage uses regular JAVA file manipulation apis such as FileOutputStream. RocketMQ introduces memory mapping, which maps disk files into memory. Operating on disks as you would operating on memory takes performance up another notch.

Memory-mapped files can be created in JAVA using FileChannel’s Map method.

The files created by this method on a Linux server use the operating system’s pagecache, or pagecache.

The memory usage strategy in The Linux operating system uses as much of the machine’s physical memory as possible and resides in memory, known as the page cache. In the case of insufficient memory of the operating system, cache replacement algorithm is adopted, such as LRU reclaims the page cache that is not commonly used, that is, the operating system will automatically manage this part of memory.

If the RocketMQ Broker process exits unexpectedly, the data stored in the page cache is not lost. The operating system periodically persists the data in the page cache to disk to ensure data security and reliability. However, the data stored in the page cache can be lost if there is an anomaly such as a machine power outage.

5, flexible and changeable brush disk strategy

With sequential writes and memory mapping, RocketMQ provides great write performance, but there are pros and cons to everything. With the introduction of memory mapping and page caching, messages are written to the page cache first, without actually persisting to disk. Does the broker return success after receiving a message from a client by storing it in the page cache or persisting it to disk?

This is a “hard” choice, a trade-off between performance and message reliability. To do this, RocketMQ offers several strategies: synchronous and asynchronous.

5.1 Synchronous Disk Flushing

Synchronous flush is called in RocketMQ’s implementationGroup to submitNot every message has to be swiped. Its design concept is shown in the figure below:Using synchronous flush, each thread will track the data to the memory, and submit the flush request to the flush thread, and then will block; The flush thread retrieves a task from the task queue,Then, a flush is triggered. Not only the messages related to the request are flushed, but all the messages to be flushed in the memory are flushed in batches at a timeAnd then you canWake up a group of request threads to achieve a group flush.

5.2 Asynchronous Disk Flushing

Synchronous flush has the advantage of ensuring that the message is not lost, that is, if the message is successfully returned to the client, it has been persisted to disk, that is, the message is very reliable, but this is at the expense of the write response latency performance, because RocketMQ messages are written to Pagecache first, so the possibility of message loss is less. If you can tolerate a certain rate of message loss, you can consider using asynchronous flush.

Asynchronous flush means that the broker stores a message to the pagecache, returns success immediately, and then starts an asynchronous thread to execute the FileChannel forece method periodically, flushing data from memory to disk at intervals of 500ms by default.

6. Memory-level read and write separation

RocketMQ introduces transientStorePoolEnable, a memory-level read-write separation mechanism, to reduce pagecache usage.

By default, RocketMQ writes messages to Pagecache and reads messages from Pagecache as they are consumed. This makes pagecache stressful and transient during high concurrencybroker busy, RocketMQ also introduces transientStorePoolEnable, which writes messages to off-heap memory and returns them immediately, asynchronously commits data from off-heap memory to Pagecache, and asynchronously flusher to disk. Its working mechanism is shown in the figure below:The message does not attempt to read from out-of-heap memory when consuming, but from Pagecache, thus formingMemory-level read/write separationThat is, messages are written primarily to out-of-heap memory, while messages are read primarily to Pagecache.

The advantage of this scheme is that messages are written directly to out-of-heap memory and then asynchronously to Pagecache. The biggest advantage is that the message is written to the Pagecache operation batch rather than appended to each message and written directly to Pagechae.

The disadvantage of this scenario is that if the Broker process exits unexpectedly due to some unexpected operation, the data stored in out-of-heap memory will be lost, but if the Pagecache is placed, the Broker exits unexpectedly without losing messages.

Article: www.codingw.net/posts/3364a…


Well, this article is introduced here, attention, like, message is the biggest encouragement to me.

To master one or two Java mainstream middleware is a necessary skill to knock on BAT and other big factories. It gives you a learning route of Java middleware and helps you realize the transformation of the workplace.

Java advanced ladder, growth path and learning materials to help break through the middleware field

Finally, share a core RocketMQ ebook with me and you will gain experience in the operation and maintenance of billions of message flows.

Get it: RocketMQ Ebook.