After looking at RocketMQ message sending in the last article, this article begins to talk about the logic associated with message storage once messages are sent to the Broker.

RocketMQ stores profile designs

RocketMQ mainly stores commitlog files, consumeQueue files, and IndexFile files.

CommitLog is a message storage file where messages for all message topics are stored. The default maximum size of this file is 1GB. After 1GB, the next CommitLog file will be sent. Through CommitLog, RocketMQ stores all messages together and writes them to disk in sequential I/O mode. Taking advantage of sequential write to disk reduces IO contention and improves data store performance.

RocketMQ’s Broker machine file storage structure on disk

【 CommitLog 】

Messages are stored in CommitLog in the following format:

Store all message contents and generate a new commitlog file when a file is filled. The data of all topics is stored together. The logical view is as follows:

CommitLog code

private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); /** * MAGIC_CODE - MESSAGE * MESSAGE's MAGIC CODE daa320a7 * [msgId, MESSAGE_MAGIC_CODE, message] */ public final static int MESSAGE_MAGIC_CODE = 0xAABBCCDD ^ 1880681586 + 8; /** * MAGIC_CODE - BLANK * End of file empty MAGIC CODE cbd43194 * [msgId, BLANK_MAGIC_CODE, blank] * When CommitLog cannot accommodate a message, */ private final static int BLANK_MAGIC_CODE = 0xBBCCDDEE ^ 1880681586 + 8; Private final MappedFileQueue MappedFileQueue; / / private final DefaultMessageStore DefaultMessageStore; /** * Flush commitLog Thread service */ Private final FlushCommitLogService FlushCommitLogService; /** * If TransientStorePool enabled, We must flush message to FileChannel at fixed periods * Commit commitLog thread service */ Private final FlushCommitLogService commitLogService; /** * Write messages to Buffer Callback */ Private final AppendMessageCallback AppendagecallBack; Private HashMap<String/* topic-queue_id */, Long/* offset */> topicQueueTable = new HashMap<>(1024); /** * TODO */ private volatile long confirmOffset = -1L; /** * Obtain the current lock time. */ Private volatile long beginTimeInLock = 0; /** * true: Can lock, false : Private AtomicBoolean putMessageSpinLock = New AtomicBoolean(true); private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true); Private ReentrantLock putMessageNormalLock = new ReentrantLock(); private ReentrantLock putMessageNormalLock = new ReentrantLock(); // Non fair SyncCopy the code

【 ConsumeQueue 】

ConsumeQueue is a message consumption queue file. After messages reach the commitlog file, they are asynchronously forwarded to the message consumption queue for consumption by message consumers. A ConsumeQueue is a queue for a topic, similar to a partition in Kafka. However, RocketMQ is very different from Kafka in the way it stores messages. It does not store specific messages. The specific message is stored by the CommitLog. The ConsumeQueue stores only the CommitLog offset of the message routed to the queue, the message size and the hash (tagCode) of the tag to which the message belongs. The total packet is only 20 bytes.

ConsumeQueue code

public static final int CQ_STORE_UNIT_SIZE = 20; private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private static final Logger LOG_ERROR = LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME); private final DefaultMessageStore defaultMessageStore; Private final MappedFileQueue MappedFileQueue; /** * Topic */ private final String topic; / / private final int queueId; /** * Private final ByteBuffer ByteBuffer index; / / private final String storePath; / / private final String storePath; Private final int mappedFileSize; / / private final int mappedFileSize; /** * Max commitLog storage location */ private long maxPhysicOffset = -1; private volatile long minLogicOffset = 0;Copy the code

Consume Queue file organization, as shown in the figure:

Consume Queue file organization diagram

  1. TopicA and queueId =0 form a ConsumeQueue and TopicA and queueId =1 form another ConsumeQueue.

  2. RETRY queues are grouped according to the GroupName of the consumer. If the consumer fails to consume, the message will be sent to the RETRY queue, such as %RETRY%ConsumerGroupA in the figure.

  3. Group the dead letter queue according to the GroupName of the consumer side. If the consumer side fails to consume and retries the specified number of times and still fails, it is sent to the dead letter queue, such as %DLQ%ConsumerGroupA in the figure.

Dead Letter queues are generally used to hold messages that cannot be delivered for some reason, such as failed processing or expired messages.

【 IndexFile 】

An IndexFile is a message IndexFile that stores the mapping between keys and offsets.

IndexFile provides a way to query messages by key or time interval.

The fileName fileName is named after the timestamp when it is created. The fixed size of a single IndexFile file is about 400M. An IndexFile can store 2000W indexes. Therefore, the underlying implementation of the rocketMQ index file is the hash index.

    private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    private static int hashSlotSize = 4;
    private static int indexSize = 20;
    private static int invalidIndex = 0;
    private final int hashSlotNum;
    private final int indexNum;
    private final MappedFile mappedFile;
    private final FileChannel fileChannel;
    private final MappedByteBuffer mappedByteBuffer;
    private final IndexHeader indexHeader;
Copy the code

The storage structure of IndexFile:

RocketMQ uses a hybrid storage structure, where all queues within a single instance of the Broker share a single log data file (CommitLog). The mixed storage structure of RocketMQ (message entities of multiple topics are stored in a single CommitLog) adopts a storage structure that separates data and index parts for producers and consumers. The Producer sends messages to the Broker. The Broker then persists the message flushing to CommitLog, either synchronously or asynchronously.

As long as the messages are persisted to the disk file CommitLog, the messages sent by the Producer are not lost.

Because of this, consumers certainly have the opportunity to consume this information. When a message cannot be pulled, it can wait for the next pull. The server also supports long polling. If a message is not pulled, the Broker can wait up to 30 seconds before returning a new message to the consumer. In this case, RocketMQ uses ReputMessageService, a backend service thread on the Broker side, to distribute requests and asynchronously build ConsumeQueue and IndexFile data.

[Message storage from a global perspective]

[Message storage process]

  • After receiving a message, the Broker saves the original message information in the MappedFile corresponding to the CommitLog file and asynchronically refreshes the message to disk

  • The ReputMessageServie thread asynchronistically saves messages from the MappedFile in CommitLog to the ConsumerQueue and IndexFile

  • ConsumerQueue and IndexFile are just the index information of the original file

Memory mapping and data flushing

[Memory mapping Process]

  • Memory mapped files MappedFile through AllocateMappedFileService created

  • The creation of the MappedFile is a typical producer-consumer model

  • When MappedFileQueue calls getLastMappedFile to get the MappedFile, it queues the request

  • AllocateMappedFileService threads continue listening to the queue, queue request, create MappedFile object

  • Finally, the MappedFile object is preheated, and the underlying force and mlock methods are called.

[Disk brushing mechanism]

  1. Asynchronous disk flushing: The message is written to the PAGECACHE of the memory and the write success status is returned. When the number of messages in the memory accumulates to a certain extent, the data is written to the disk. High throughput. Messages are lost when the disk is damaged

  2. Synchronous flush: the message is written to the PAGECACHE of memory, immediately notify the flush thread flush disk, and then wait for the completion of the flush disk, flush the thread after the completion of the execution wake up waiting for the thread, to return the status of the application message write success. Low throughput, but no message loss.

[Disk brushing process]

  • Messages sent by the producer to the broker are stored in the MappedFile and then synchronized to the disk through a disk flushing mechanism.

  • Disk brushing is divided into synchronous disk brushing and asynchronous disk brushing.

  • The background thread of asynchronous disk flushing executes at a specified interval.

  • Synchronous flush is also a producer-consumer model. After the broker saves the message to the MappedFile, it creates the GroupCommitRequest request to place the request in the list and blocks to wait. The background thread gets the request from the list and flushes the disk, notifying the waiting thread when the flush is successful.

RocketMQ file storage model hierarchy

File storage model hierarchy diagram

The hierarchical structure of RocketMQ file storage model is shown in the figure above. According to the categories and functions, the conceptual model can be roughly divided into 5 layers. The following are analyzed and expounded from each layer: The business logic entry for reading and writing messages on the Broker side, such as pre-checking and validation steps, constructing the MessageExtBrokerInner object, decoding deserialization, constructing the Response return object, etc. RocketMQ Data storage component layer; This layer is primarily RocketMQ’s storage core class – DefaultMessageStore, which is the entry point for RocketMQ message data files, The “putMessage()” and “getMessage()” methods of this class are used to read and write the log data file stored by the CommitLog message (depending on the methods provided by the CommitLog object model in the next layer). In addition, when the component is initialized, many storage-related background service threads are started, Including AllocateMappedFileService (MappedFile pre-allocated thread), ReputMessageService (playback storage service thread), HAService (Broker master-slave synchronous high availability service thread), StoreStatsSe Rvice (message store statistics service thread), IndexService (index file service thread), etc. RocketMQ Storage Logical Object Layer: This layer mainly contains three model classes IndexFile, ConsumerQueue, and CommitLog that are directly related to RocketMQ data file storage. IndexFile provides access to index data files, ConsumerQueue provides access to logical message queues, and CommitLog provides access to log data files where messages are stored. These three model classes also form the overall structure of the RocketMQ storage layer (an in-depth analysis of these three model classes will be in a later section); Encapsulated file memory mapping layer: RocketMQ mainly uses MappedByteBuffer and FileChannel in JDK NIO to complete data file reading and writing. Among them, MappedByteBuffer, a memory-mapped disk file, is used to complete reading and writing large files, and the class is encapsulated as MappedFile in RocketMQ. The problem of limitation has been addressed above; For each type of large files (IndexFile/ConsumerQueue/CommitLog), When stored, it is divided into multiple fixed-size files (a single IndexFile is 400M, a single ConsumerQueue is 5.72m, a single CommitLog is 1G). Each separated file is named as the number of bytes +1 of all previous files. Is the initial offset of the file, so as to realize the series of the whole large file. Here, each type of single file is provided by the MappedFile class read and write services (among which, MappedFile class provides sequential write/random read, memory data flushing, memory cleaning and other file-related services); Disk storage tier: Primarily refers to the disks used to deploy the RocketMQ server. Consider the impact of different disk types (such as SSDS or HDDS) and disk performance parameters (such as IOPS, throughput, and access latency) on sequential write and random read operations.

High availability of file storage

[Distributed storage]

The data on the same topic is divided into multiple queues across different brokers, and each queue is replicated.

[Replication master/slave synchronization (HA)]

The primary/secondary synchronization mechanism of RocketMQ is as follows:

1. Start Master and listen on the specified port.

2. The client starts and connects to the Master to establish a TCP connection.

3. The client pulls messages from the server at an interval of 5 seconds. If the client pulls messages from the server for the first time, the client obtains the maximum offset in the local commitlog file and uses the offset to pull messages from the server.

4. The server resolves the request and returns a batch of data to the client.

5. After receiving a batch of messages, the client writes the messages to the local commitlog file, reports the pull progress to the Master, and updates the offset to be pulled next time.

6. Then repeat Step 3.

Optimization techniques for file storage

The RocketMQ storage tier uses several optimization techniques to reduce the impact of PageCache’s disadvantages to a certain extent, including memory preallocation, file preheating and mLOCK system call.

[Preassign MappedFile]

During the message writing process (by calling CommitLog’s putMessage() method), CommitLog first obtains an MappedFile from the MappedFileQueue queue and creates a new one if it does not exist.

The clever design of pre-allocated MappedFiles in RocketMQ allows you to simply return them the next time you fetch them without the delay of waiting for the MappedFile to create the allocation.

[File preheating && MLOCK system call]

(1) mlock system call: it can lock part or all of the address space used by the process in the physical memory, to prevent it from being swapped to the swap space. For a high-throughput distributed message queue like RocketMQ, where message read and write latency is low, you want to use as much physical memory as possible to improve the operational efficiency of data read and write access. (2) file preheating: the purpose of preheating is mainly two points; First, because only allocating memory and making the mLock system call does not lock that memory completely for the program, because pages in it may be copied on write. Therefore, it is necessary to write a false value to each memory page. RocketMQ prewrites some random values to the memory space mapped by Mmap during the creation and allocation of the MappedFile. Second, after Mmap is invoked for memory mapping, the OS only creates a mapping table of virtual memory addresses to physical addresses, and does not actually load any files into memory. When a program wants to access the data, the OS checks to see if that section of the page is already in memory and issues a page missing interrupt if it is not. Here, imagine how many page missing interrupts a 1G CommitLog needs to occur before the corresponding data is fully loaded into physical memory (ps: a standard page size of 4KB on X86 Linux). The practice of RocketMQ is to make the Mmap memory mapping while making the mADVISE system call. The purpose is to make the corresponding file data preloaded into the memory as much as possible after the OS makes a memory mapping, so as to achieve the effect of memory preheating.