File parsing

RocketMQ is actually file-oriented programming where the data is all encoded and read and state and so on based on the file

For example, our kernel is a virtual file system and ZooKeeper is a virtual directory structure (memory structure).

Using file I/O streams we know memory efficiency > disk efficiency > network I/O efficiency so memory efficiency is the highest. Why is RocketMQ designed using disk?

RocketMQ’s data is actually written and read directly to disk based on file IO and as you know file IO is actually done by the kernel, So this piece is to rely on some design in the kernel to optimize the efficiency of the whole IO typical RandomAccessFile map method is to rely on the kernel of MMAP to achieve,RocketMQ has two ways to drop disk synchronous brush disk and asynchronous brush disk this piece in front of the author briefly described this article mainly understand Format of data stored in a specific file

Rocket file storage

RocketMQ is stored in the ${ROCKET_HOME}/store directory

rocketmq@master-35:~/store$ll-RW-RW-r -- 1 RocketMQ RocketMQ 0 8月 25 16:37 ABORT - RW-RW-r -- 1 RocketMQ rocketMQ 4096 9月 7 16:26 checkpoint drwxrwxr-x 2 RocketMQ RocketMQ 4096 August 25 16:36 commitlog/ DRwxrwxr-x 2 RocketMQ RocketMQ 4096 September 7 16:27 config/ drwxrwxr-x 3 RocketMQ RocketMQ 4096 August 25 16:32 consumequeue/ drwxrwxr-x 2 RocketMQ RocketMQ 4096 August 25 16:32 index/ -RW-RW-r -- 1 RocketMQ RocketMQ 4 8月 25 16:37 lockCopy the code

The main storage files and directories include the following

  1. Abort is a state flag used to record whether the entire broker is safely shut down. At run time the abort file is created and deleted in the case of a safe shut down
  2. Checkpoint Records the timestamp of the last flush of the commitlog, consumeQueue, and index files
  3. This folder contains all messages currently received by the broker. The format of commitlog messages is described in more detail below
  4. Config files that the broker runs and the progress of the consumer
  5. Consumequeue topic stores consumption queues. A directory is created under each topic for each queue. The directory is named queue ID and the structure and function of consumeQueue are described in detail below
  6. An index file stores a Message with a key. The following describes the function and structure of the index file
  7. Lock Global lock used during runtime

CommitLog

Written way

The interface to MessageStore defines several ways to PutMessage

CompletableFuture Default CompletableFuture<PutMessageResult> asyncPutMessage(Final MessageExtBrokerInner MSG) { return CompletableFuture.completedFuture(putMessage(msg)); } // Asynchronous batch write returns CompletableFuture default CompletableFuture<PutMessageResult> asyncPutMessages(final MessageExtBatch) messageExtBatch) { return CompletableFuture.completedFuture(putMessages(messageExtBatch)); PutMessageResult PutMessageResult putMessage(Final MessageExtBrokerInner MSG); PutMessageResult PutMessageResult putMessages(Final MessageExtBatch MessageExtBatch);Copy the code

In RocketMQ, the implementation class is DefaultMessageStore. In my last article, I introduced the logic of synchronous brush asynchronous brush and the asynchronous programming model CompletableFuture and part of the source code to implement synchronous double write. It should be added that Rocketmq writes messages sequentially so that they are most efficient.

Here we mainly introduce the data format and write mechanism of the file as well as expiration cleaning and copy mechanism

The data format

The name of the commitlog file is 20 decimal digits. It indicates the offset of the file at the start of the current commitlog file. The first file must be 00000000000000000000. Each commitlog file is a fixed 1 GB file size, but the content is 1 GB minor, and the message is an integral unit, so it is 1 GB minor.

Commitlog stores all messages regardless of Topic. The commitlog data structure is as follows

attribute describe type
msgLen The length of the message Int
bodyCrc Checksum check Int
queueId The queue id Long
flag User defined Int
queueOffset Queue offset Long
physicalOffset Disk file offset Long
sysFlag Used to calculate FilterType and transaction status Int
bornHostTimestamp Message creation time Long
bornHost Producer Host, ipv4: IP(4)+Port(4) 8, Ipv6: IP(16)+Port(4) 8 or 20
storeTimeStamp Message storage time Long
storehostAddressLength The broker address 8 or 20
reconsumeTimes Recovery time Int
preparedTransactionOffset Preprocessed transaction message offset Int
bodyLength Body length Int
body The message body bodyLength
topicLength Message attribution Topic
propertiesLength Configuration information

Correspond to MessageExt in the Rocket source code

private String brokerName;

private int queueId;

private int storeSize;

private long queueOffset;
private int sysFlag;
private long bornTimestamp;
private SocketAddress bornHost;

private long storeTimestamp;
private SocketAddress storeHost;
private String msgId;
private long commitLogOffset;
private int bodyCRC;
private int reconsumeTimes;

private long preparedTransactionOffset;

Copy the code

The corresponding Buffer code after encoding

protected PutMessageResult encode(MessageExtBrokerInner msgInner) {
    ....
    // Initialization of storage space
    this.resetByteBuffer(encoderBuffer, msgLen);
    // 1 TOTALSIZE
    this.encoderBuffer.putInt(msgLen);
    // 2 MAGICCODE
    this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE);
    // 3 BODYCRC
    this.encoderBuffer.putInt(msgInner.getBodyCRC());
    // 4 QUEUEID
    this.encoderBuffer.putInt(msgInner.getQueueId());
    // 5 FLAG
    this.encoderBuffer.putInt(msgInner.getFlag());
    // 6 QUEUEOFFSET, need update later
    this.encoderBuffer.putLong(0);
    // 7 PHYSICALOFFSET, need update later
    this.encoderBuffer.putLong(0);
    // 8 SYSFLAG
    this.encoderBuffer.putInt(msgInner.getSysFlag());
    // 9 BORNTIMESTAMP
    this.encoderBuffer.putLong(msgInner.getBornTimestamp());
    // 10 BORNHOST
    socketAddress2ByteBuffer(msgInner.getBornHost() ,this.encoderBuffer);
    // 11 STORETIMESTAMP
    this.encoderBuffer.putLong(msgInner.getStoreTimestamp());
    // 12 STOREHOSTADDRESS
    socketAddress2ByteBuffer(msgInner.getStoreHost() ,this.encoderBuffer);
    // 13 RECONSUMETIMES
    this.encoderBuffer.putInt(msgInner.getReconsumeTimes());
    // 14 Prepared Transaction Offset
    this.encoderBuffer.putLong(msgInner.getPreparedTransactionOffset());
    // 15 BODY
    this.encoderBuffer.putInt(bodyLength);
    if (bodyLength > 0)
        this.encoderBuffer.put(msgInner.getBody());
    // 16 TOPIC
    this.encoderBuffer.put((byte) topicLength);
    this.encoderBuffer.put(topicData);
    // 17 PROPERTIES
    this.encoderBuffer.putShort((short) propertiesLength);
    if (propertiesLength > 0)
        this.encoderBuffer.put(propertiesData);

    encoderBuffer.flip();
    return null;
}
Copy the code

Write mechanism

Previously we briefly analyzed the asynchronous programming model and optimizations after 4.7 with asyncPutMessage. Now we will focus on the write mechanism and the timing of file creation.

Send message request

SendMessageRequestHeader requestHeader = parseRequestHeader(request); SendMessageRequestHeader (request); if (requestHeader == null) { return CompletableFuture.completedFuture(null); MqtraceContext = buildMsgContext(CTX, requestHeader);} // Message context can be used to trace messages. / / registered some news hook enclosing executeSendMessageHookBefore (CTX, request, mqtraceContext); If (requestHeader isBatch ()) {/ / batch message return enclosing asyncSendBatchMessage (CTX, request, mqtraceContext requestHeader); } else {// single message return this.asyncSendMessage(CTX, request, mqtraceContext, requestHeader); }Copy the code
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request, SendMessageContext mqtraceContext, SendMessageRequestHeader requestHeader) { int queueIdInt = requestHeader.getQueueId(); TopicConfig = TopicConfig = TopicConfig = TopicConfig = TopicConfig = TopicConfig = TopicConfig = TopicConfig = TopicConfig this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); . MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); . CompletableFuture<PutMessageResult> PutMessageResult = null; String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (transFlag ! ParseBoolean (transFlag)) {// Transaction message store putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner); } else {/ / ordinary message storage putMessageResult = this. BrokerController. GetMessageStore () asyncPutMessage (msgInner); } return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt); }Copy the code

The default implementation of getMessageStore is DefaultMessageStore and the last implementation of the message store is CommitLog or DLedgerCommitLog. We’ll focus on the CommitLog implementation. DLedgerCommitLog will be described in detail in the Raft protocol implementation later

Processing message store

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) { ... / / omitted judgment and populate logic / / get news of the encoder PutMessageThreadLocal PutMessageThreadLocal = this. PutMessageThreadLocal. The get (); // Encode the message structure as Bytebuffer PutMessageResult encodeResult = in 'MessageExtEncoder' putMessageThreadLocal.getEncoder().encode(msg); / / the coding results assignment to MSG EncodeBuffer MSG. SetEncodedBuff (putMessageThreadLocal. GetEncoder (.) encoderBuffer); ReentrantLock putMessagelock.lock (); Try {/ / last not to obtain the corresponding MappedFile empty MappedFile MappedFile = this. MappedFileQueue. GetLastMappedFile (); . If (null = = mappedFile | | mappedFile. IsFull ()) {/ / when mappedFile is empty or has been with the newly created a mappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(0); }... / / the message is appended to the result in the mappedFile = mappedFile. AppendMessage (MSG, enclosing appendMessageCallback, putMessageContext); switch (result.getStatus()) { //.... case END_OF_FILE: unlockMappedFile = mappedFile; / / write to the file is too large More than the total size of the mappedFile newly created file again write commitlog tail left the default length is 8 bytes mappedFile = this. MappedFileQueue. GetLastMappedFile (0); / /... result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); break; } elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; beginTimeInLock = 0; } finally { putMessageLock.unlock(); }Copy the code

Through the code above we can see there are two kinds of file creation time, one is the first message storage memory no mappedFile corresponding files by mappedFileQueue. GetLastMappedFile (0) way to create new mappedFile file, The second option is to create a new mappedFile when the current latest mappedFile cannot write new messages.

The MappedFile file is created

if (mappedFileLast ! = null && mappedFileLast.isFull()) { createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize; } if (createOffset ! NeedCreate) {// nextFilePath is the offset at the start of the last file + the size of the last file String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset); String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset + this.mappedFileSize);  MappedFile mappedFile = null; . / / omit part of the logic mappedFile = this. AllocateMappedFileService. PutRequestAndReturnMappedFile (nextFilePath nextNextFilePath, this.mappedFileSize) }Copy the code

To create through AllocateMappedFileService MappedFile

private boolean mmapOperation() { .... Req = this.requestqueue.take (); . If (req.getMappedFile() == null) {long beginTime = System.currentTimemillis (); MappedFile mappedFile; / / determine whether open isTransientStorePoolEnable, if open outside the heap memory is used for writing data, finally commit to FileChannel from outside the heap memory. if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { try { mappedFile = ServiceLoader.load(MappedFile.class).iterator().next(); mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool()); } catch (RuntimeException e) { log.warn("Use default implementation."); mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool()); }} else {// create MappedFile with mmap(RandomAccessFile#channel#map) MappedFile = new MappedFile(req.getFilePath(), req.getFileSize()); } // Check whether the file size is greater than or equal to 1 GB. Commitlog files are only 1 GB files (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig() .getMappedFileSizeCommitLog() && this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(), this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile()); } req.setMappedFile(mappedFile); this.hasException = false; isSuccess = true; }...Copy the code

Here are a few key points to note down isTransientStorePoolEnable and warmMappedFile isTransientStorePoolEnable this must be in asynchronous brush set and the master node to open, The data is written to off-heap memory and then written to the FileChannel via a batch commit. This will affect the replication to the Slave node. Data copied to the Slave node will only be replicated if the data has been committed. So it may cause data of slave node with a delay This delay by commitIntervalCommitLog commitCommitLogThoroughInterval defaults to 200 ms configuration.

warmMappedFile

WarmMappedFile is mainly used to preheat data. It can be seen from the following code that the default pagecache size in the kernel is 4K. All programs will first write data to Pagecache or pagecache when reading and writing files To read data from pagecache, the preheat here is to allocate the memory size to facilitate the later directly write to avoid the process of reallocating pagecache resulting in page missing exception, The purpose of writing all memory data to 0 is to occupy space. According to the size of the Pagecache cycle, that is, 4K, write 0 on each page to occupy the corresponding pagecache. Here we also distinguish how many pages will be used for a total of 1 GB if it is synchronous swipe

public void warmMappedFile(FlushDiskType type, int pages) {
    long beginTime = System.currentTimeMillis();
    ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
    int flush = 0;
    long time = System.currentTimeMillis();
    for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
        byteBuffer.put(i, (byte) 0);
        // force flush when flush disk type is sync
        if (type == FlushDiskType.SYNC_FLUSH) {
            if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
                flush = i;
                mappedByteBuffer.force();
            }
        }

        // prevent gc
        if (j % 1000 == 0) {
            log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
            time = System.currentTimeMillis();
            try {
                Thread.sleep(0);
            } catch (InterruptedException e) {
                log.error("Interrupted", e);
            }
        }
    }

    // force flush when prepare load finished
    if (type == FlushDiskType.SYNC_FLUSH) {
        log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",
            this.getFileName(), System.currentTimeMillis() - beginTime);
        mappedByteBuffer.force();
    }
    log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(),
        System.currentTimeMillis() - beginTime);

    this.mlock();
}
Copy the code

mlock

Since the DirectBuffer is using off-heap memory, it can obtain the corresponding physical memory address of the buffer. Mlock can prevent the memory used by the current DirectBuffer from being swapped into swap space. The advantage of this is that it is more efficient when the program is frequently reading or writing data because the physical memory is never released by the current process.

Madvise Madvise provides a recommendation to the kernel for I/O in the address range, and the kernel may adopt this recommendation and perform some prefetch operations. MADV_WILLNEED is expected to be accessed in the near future, and the kernel will prefetch some data into Pagecache.

For example, when we read a file, we read the pagecache and the pagecache corresponds to a file descriptor, and the file descriptor corresponds to a real physical disk address, so if there is no Pagecache, The system will trigger a page-missing exception and create a Pagecache and bind it to a file descriptor, and then write the data back to the Pagecache and the pagecache will be returned to the corresponding application process, Therefore, madvise allows the kernel to prefetch data from an address range to pagecache, reducing page missing anomalies, thus improving the efficiency of the entire system.

public void mlock() { final long beginTime = System.currentTimeMillis(); final long address = ((DirectBuffer) (this.mappedByteBuffer)).address(); Pointer pointer = new Pointer(address); { int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize)); log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime); } { int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED); log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime); }}Copy the code

File expiration time

Messages are stored sequentially in a Commitlog file. Since the size of each message is not fixed, message cleanup is performed in time units

DefaultMessageStore creates a schedule to check expired files every 10 seconds

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        DefaultMessageStore.this.cleanFilesPeriodically();
    }
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
Copy the code

Cleanup files contain commitLog and consumeQueue

private void cleanFilesPeriodically() {
    this.cleanCommitLogService.run();
    this.cleanConsumeQueueService.run();
}
Copy the code

The deletion conditions are determined as follows in commitlog

private void deleteExpiredFiles() { int deleteCount = 0; Expiration time hour as the unit in the configuration file is / / long fileReservedTime = DefaultMessageStore. Enclosing getMessageStoreConfig () getFileReservedTime (); . DeleteWhen Boolean timeup = this.istimeTodelete (); // Default disk space usage is 75% Boolean spacefull = this.isspacetodelete (); / / removed manually Boolean manualDelete = this. ManualDeleteFileSeveralTimes > 0; if (timeup || spacefull || manualDelete) { if (manualDelete) this.manualDeleteFileSeveralTimes--; boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately; log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", fileReservedTime, timeup, spacefull, manualDeleteFileSeveralTimes, cleanAtOnce); fileReservedTime *= 60 * 60 * 1000; deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval, destroyMapedFileIntervalForcibly, cleanAtOnce); if (deleteCount > 0) { } else if (spacefull) { log.warn("disk space will be full soon, but delete file failed."); }}}Copy the code
private final double diskSpaceWarningLevelRatio = Double. ParseDouble (System. GetProperty (" rocketmq. Broker. DiskSpaceWarningLevelRatio ", "0.90")); private final double diskSpaceCleanForciblyRatio = Double. ParseDouble (System. GetProperty (" rocketmq. Broker. DiskSpaceCleanForciblyRatio ", "0.85"));Copy the code

The default expiration time is 72 hours which is 3 days, in addition to our automatic cleanup, the following situations will also be automatic cleanup regardless of whether the file has been consumed

The default value is 4:00 a.m., automatic cleanup expiration time. 2. When the disk space usage exceeds 75%, the automatic cleanup expiration time is automatically cleared regardless of whether the cleanup expiration time is reached. Clean the file according to the preset rule (default is the earliest), regardless of whether the file has expired. 4. When the disk usage reaches 90%, the broker rejects writing messages

ConsumeQueue

ConsumeQueue stores Topic consumption queues. The directory structure is Topic /queueid/offet

└ ─ ─ TopicTest ├ ─ ─ 0 │ └ ─ ─ 00000000000000000000 ├ ─ ─ 1 │ └ ─ ─ 00000000000000000000 ├ ─ ─ 2 │ └ ─ ─ 00000000000000000000 └ ─ ─ 3 └ ─ ─ 00000000000000000000Copy the code

A directory has been created under each topic for each of our queues. The directory name is the ID of the queue. The directory names stored under queue are the same as commitlog format, consisting of 20 decimal digits that identify the offset of the current queue. The difference is that each file name is fixed because the file size of ConsumeQueue is fixed This means that the names of files in different queues are the same.

String queueDir = this.storePath
    + File.separator + topic
    + File.separator + queueId;

this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null);
Copy the code

role

Rocketmq consumes messages through a subscription Topic, but since commitlogs store messages regardless of Topic, it is inefficient for consumers to consume messages through a commitlog. Therefore, ConsumeQueue is designed to store the offset consumed in each queue under Topic. This can be understood as an index file corresponding to a queue message.

The data format

The fixed size of a single ConsumeQueue is 20 bytes, so the size of each Consume is fixed and the size of individual files is fixed: 300,000 pieces of data.

public static final int CQ_STORE_UNIT_SIZE = 20;

// ConsumeQueue file size,default is 30W
private int mappedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE;
Copy the code

The ConsumeQueue file is used to query the message data in the corresponding queue ConsumeQueue storage mechanism and how to locate specific messages through ConsumeQueue and how to deal with the consumption offset of messages and how to find corresponding messages by time.

Create time

The time to create a ConsumeQueue is when a mappedFile is created for a consumeQueueTable that does not have the current top queueid

public ConsumeQueue findConsumeQueue(String topic, int queueId) { .... ConsumeQueue logic = map.get(queueId); Consumequeue newLogic = new ConsumeQueue(topic, consumeQueue) {if (null == logic) {consumeQueue newLogic = new ConsumeQueue(topic, consumequeue)  queueId, StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(), this); ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic); if (oldLogic ! = null) { logic = oldLogic; } else { logic = newLogic; } } return logic; }Copy the code

There are many places to call findConsumeQueue, such as when retrieving messages from a queue via Topic and queueId

Data recovery

The DefaultMessageStore#load method is triggered at startup to load data from the local disk into memory and restore the disk state to the program memory.

public boolean load() { ... // load Commit Log result = result && this.commitLog.load(); // load Consume Queue result = result && this.loadConsumeQueue(); . } private long recoverConsumeQueue() { long maxPhysicOffset = -1; for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) { for (ConsumeQueue logic : maps.values()) { logic.recover(); if (logic.getMaxPhysicOffset() > maxPhysicOffset) { maxPhysicOffset = logic.getMaxPhysicOffset(); } } } return maxPhysicOffset; }Copy the code

You can see that DefaultMessageStore#load has a loadConsumeQueue method

private boolean loadConsumeQueue() { File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir())); File[] fileTopicList = dirlogic.listFiles (); if (fileTopicList ! = null) { for (File fileTopic : fileTopicList) { String topic = fileTopic.getName(); File[] fileQueueIdList = fileTopic.listFiles(); if (fileQueueIdList ! = null) { for (File fileQueueId : fileQueueIdList) { int queueId; try { queueId = Integer.parseInt(fileQueueId.getName()); } catch (NumberFormatException e) { continue; } ConsumeQueue logic = new ConsumeQueue( topic, queueId, StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(), this); this.putConsumeQueue(topic, queueId, logic); } } } } log.info("load logics queue all over, OK"); return true; }Copy the code

After the load, recover is used to initialize the data to out-of-heap memory

public void recover() { final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles(); if (! mappedFiles.isEmpty()) { int index = mappedFiles.size() - 3; if (index < 0) index = 0; int mappedFileSizeLogics = this.mappedFileSize; MappedFile mappedFile = mappedFiles.get(index); ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); long processOffset = mappedFile.getFileFromOffset(); long mappedFileOffset = 0; long maxExtAddr = 1; while (true) { for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) { .... // Record the maximum physical offset this.maxPhysicOffset = offset + size; . ProcessOffset += mappedFileOffset; this.mappedFileQueue.setFlushedWhere(processOffset); this.mappedFileQueue.setCommittedWhere(processOffset); this.mappedFileQueue.truncateDirtyFiles(processOffset); if (isExtReadEnable()) { this.consumeQueueExt.recover(); log.info("Truncate consume queue extend file by max {}", maxExtAddr); this.consumeQueueExt.truncateByMaxAddress(maxExtAddr); }}}Copy the code

To get the message

Because the overall process of consuming message is relatively long, the source code of the PullMessageProcessor will be analyzed in detail to analyze the whole consuming link. This article mainly analyzes the role of consumeQueue so mainly around the consumeQueue this class to analyze the function of consumeQueue.

public SelectMappedBufferResult getIndexBuffer(final long startIndex) { int mappedFileSize = this.mappedFileSize; // Use startIndex * to locate a message. Offset long offset = startIndex * CQ_STORE_UNIT_SIZE; if (offset >= this.getMinLogicOffset()) { MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset); if (mappedFile ! = null) { SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize)); return result; } } return null; }Copy the code

The above code corresponds to the ConsumeQueue#getIndexBuffer method, which mainly uses startIndex to get the items consuming the message. Since the size of each item is fixed, it only needs to locate the specific value of offset according to index*20. You know the data for the specific item SelectMappedBufferResult which is the item that was consumed at the beginning and then consumed later, and that’s in the DefaultMessageStore#getMessages method.

. for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); long tagsCode = bufferConsumeQueue.getByteBuffer().getLong(); . SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy); }Copy the code

Here I’ve taken some code to illustrate the logic of the consumeQueue. As stated above, each entry contains commitlogOffset, size, and tagCode. So ommitlog.getMessage (offsetPy, sizePy) gets the message at the corresponding location.

Get messages by time

In consumeQueue #getOffsetInQueueByTime, messages are read with time offset, but messages are not recorded in consumeQueue, so we cannot avoid going to commitlog to get the offset of the message. Let’s see how consumeQueue is implemented.

public long getOffsetInQueueByTime(final long timestamp) { MappedFile mappedFile = this.mappedFileQueue.getMappedFileByTime(timestamp); if (mappedFile ! = null) { long offset = 0; int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0; int high = 0; int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1; long leftIndexValue = -1L, rightIndexValue = -1L; long minPhysicOffset = this.defaultMessageStore.getMinPhyOffset(); SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0); if (null ! = sbr) { ByteBuffer byteBuffer = sbr.getByteBuffer(); high = byteBuffer.limit() - CQ_STORE_UNIT_SIZE; MidOffset = (low + high)/(2 * CQ_STORE_UNIT_SIZE) * CQ_STORE_UNIT_SIZE; / /... Omit data read / / gets the current position of the message from commitlog time long storeTime = this. DefaultMessageStore. GetCommitLog () pickupStoreTimestamp (phyOffset,  size); if (storeTime < 0) { return 0; } else if (storeTime == timestamp) { targetOffset = midOffset; break; } else if (storeTime > timestamp) {// If (storeTime > timestamp) {high = midOffset -cq_store_unit_size; rightOffset = midOffset; rightIndexValue = storeTime; Low = midOffset + CQ_STORE_UNIT_SIZE; leftOffset = midOffset; leftIndexValue = storeTime; } } if (targetOffset ! = -1) { offset = targetOffset; } else { if (leftIndexValue == -1) { offset = rightOffset; } else if (rightIndexValue == -1) { offset = leftOffset; } else { offset = Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp - rightIndexValue) ? rightOffset : leftOffset; } } return (mappedFile.getFileFromOffset() + offset) / CQ_STORE_UNIT_SIZE; } finally { sbr.release(); } } } return 0; }Copy the code

From the above we can see that RocketMQ uses binary lookup of the consumeQueue to retrieve the corresponding message points in the commitlog for comparison.

IndexFile

IndexFile stores messages containing keys. When producers produce messages to brokers, When a message contains a Key, the Broker records the index of the message in an IndexFile. Only messages containing keys are recorded because RocketMq can query messages by specifying keys

IndexFile timestamp command 20210901183407523 when the file name is created

The file contains three parts: Header Header,Slots, and the Indexes list

IndexHeader

attribute describe type
beginTimestamp The first message timestamp Long
endTimestampIndex Last message timestamp Long
beginPhyOffset Offset started in commitlog Long
endPhyOffset Offset ends in commitlog Long
hashSlotCount SlotCount used Int
IndexCount The number of index units Int

The Header in the index file contains the above sections, and the size is equal to 8 + 8 + 8 + 8 + 4 + 4 = 40 bytes

private static int beginTimestampIndex = 0; private static int endTimestampIndex = 8; private static int beginPhyoffsetIndex = 16; private static int endPhyoffsetIndex = 24; private static int hashSlotcountIndex = 32; private static int indexCountIndex = 36; . private AtomicLong beginTimestamp = new AtomicLong(0); private AtomicLong endTimestamp = new AtomicLong(0); private AtomicLong beginPhyOffset = new AtomicLong(0); private AtomicLong endPhyOffset = new AtomicLong(0); private AtomicInteger hashSlotCount = new AtomicInteger(0); private AtomicInteger indexCount = new AtomicInteger(1);Copy the code

For example, the Index in the code represents the corresponding location in the file, and all values are implemented in an Atomic manner using CAS, which is safe in multi-threading

Slot

A Slot is a 4-byte Int that represents the latest index of the key in the current Slot in the Indexes class, implemented in IndexFile

public IndexFile(final String fileName, final int hashSlotNum, final int indexNum, final long endPhyOffset, final long endTimestamp) throws IOException { int fileTotalSize = IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize); . this.hashSlotNum = hashSlotNum; }Copy the code
private int maxHashSlotNum = 5000000;
private int maxIndexNum = 5000000 * 4;
Copy the code

FileTotalSize indicates that the size of the entire IndexFile is fixed, MaxHashSlotNum indicates that the maximum number of slots is 5 million, and maxIndexNum indicates that the maximum number of index units is 20 million

How to calculate the slot of each key? It’s the hashcode of the key for 5 million

int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
Copy the code

SlotPos represents the index of the slot where the current message key is located

The value in each slot is Position in the index corresponding to the latest Indexes

Position int absSlotPos = IndexHeader.index_header_size + slotPos * // indexHead is 40B plus the slot index *4B hashSlotSize; / / by the position corresponding to the slot on the value of the int slotValue = this. MappedByteBuffer. Get int (absSlotPos); if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) { slotValue = invalidIndex; }... / / set the slot position the latest index value of the current index of cell count is inserted into the index of the unit in the indexes of the index enclosing mappedByteBuffer. PutInt (absSlotPos, this.indexHeader.getIndexCount());Copy the code

The basic relationship is basically as shown in the figure below. The file format of indexes is as follows so far

Index

Index The data structure of the index unit is as follows

attribute describe type
keyHash HashCode for the message key Int
phyOffset The offset of the message in the Commitlog file Long
timeDiff The time difference between the current message and the file start message is accurate to seconds Int
preSlotValue The subscript value corresponding to the last index unit (easy to understand) Int

So each index unit occupies 4+8+4+4 = 20B

The relationship between the three

The process of querying messages by Key

  1. Gets the HashCode for the corresponding Key
  2. Find the slot using HashCode
  3. Obtain the latest Index by slot
  4. I’m going to go up by HasCode comparison in the index and I can also do time filtering here
  5. Query the corresponding message for the offset record of the commitLog in the qualified Index unit

The specific implementation code is as follows

. for (int nextIndexToRead = slotValue; ;) { if (phyOffsets.size() >= maxNum) { break; } int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + nextIndexToRead * indexSize; int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos); long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4); long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8); int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4); if (timeDiff < 0) { break; } timeDiff *= 1000L; long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff; boolean timeMatched = (timeRead >= begin) && (timeRead <= end); if (keyHash == keyHashRead && timeMatched) { phyOffsets.add(phyOffsetRead); } if (prevIndexRead <= invalidIndex || prevIndexRead > this.indexHeader.getIndexCount() || prevIndexRead == nextIndexToRead || timeRead < begin) { break; } nextIndexToRead = prevIndexRead; }...Copy the code

The relationship between the three through language description will lead to a more roundabout in fact is relatively simple

  1. The slot is 500W in length
  2. Take the 500W module by Key to find the Position corresponding to the corresponding slot
  3. The value in the slot is the latest Position of the index in the index unit corresponding to the current slot
  4. The Position of the index unit in the same Slot is stored in the index unit

The reason for this is that fetching slots via HasCode is a conflict, and the conflict is linked to the same logic as a linked list. In fact, it is similar to HasMap, except that HashMap in JDK8 has a red-black tree, and RocketMQ is stored in a file