preface

From the application examples in the previous two articles, we already have an idea of what the architecture of RocketMQ looks like. As shown in figure:

Mainly the following parts:

  • producer
  • consumer
  • broker
  • nameserver

If you’ve ever deployed RocketMQ yourself, this deployment architecture diagram is pretty clear:

This article takes a look at how message storage is designed and implemented in RockerMQ.

Message storage

Prior knowledge

Before we get there, let’s understand some basic concepts:

partition

Messages from the same topic in the message queue may be stored on multiple partitions, as shown below:

offset

Each partition of the message on the broker is organized into a list of files, and the consumer needs to know the data’s offset in the file to pull the data. This offset is called offset. Offset is the absolute Offset that the server converts to the relative Offset of the specific file, and the consumer consumes the Offset of the message queue to find message

Storage architecture

Message storage is the most complex and important part of RocketMQ.

We can simplify this picture even more:

RocketMQ designs different storage structures for Producer and Consumer, with Producer corresponding to CommitLog and Consumer corresponding to ConsumeQueue.

This is a classic example of “asynchronization,” or “offline computing.” The use of “asynchronous threads” is justified because message queues are designed to “buffer messages”. As long as messages are sent to CommitLog, they are not lost. As long as messages are not lost, there is “plenty of wiggle room” for a background thread to slowly synchronize to the ConsumeQueue, which is then consumed by the Consumer. This is also a classic case of “final consistency” within a message queue: the Producer sends a message to a CommitLog, and the Consumer doesn’t see it. But it doesn’t matter, as long as the message doesn’t get lost, it’s bound to end up in the ConsumeQueue, visible to the Consumer.

CommitLog

The message body and metadata storage body store the message body content written by the Producer end, and the message content is not fixed length.

Generate rules

The default file name is 1 GB, and the file name is 20 bits. Zeros are added on the left, and the remaining offset is the start offset. For example, 00000000000000000000 is the first file. When the first file is full, the second file is 00000000001073741824, and the start offset is 1073741824, and so on. Messages are mainly written sequentially to the log file, and when the file is full, to the next file.

The store path

❯ CD ~/store ~/store ❯ ll total 16-RW-r --r-- 1 root staff 0B Dec 6 10:48 abort - RW-r --r-- 1 Root staff 4.0K Dec 6 15:46  checkpoint drwxr-xr-x 3 root staff 96B Sep 7 16:30 commitlog drwxr-xr-x 12 root staff 384B Dec 6 15:46 config drwxr-xr-x 5 root staff 160B Nov 30 14:06 consumequeue drwxr-xr-x 3 root staff 96B Dec 6 11:46 index -rw-r--r-- 1 root staff 4B Dec 6 11:46 lockCopy the code

The storage rules

RocketMQ improves performance by using a single log file, which stores messages from all topics on the same machine in a single file, avoiding random disk writes.

RocketMQ stores CommitLog, Consume Queue, and Index File data files. Since memory and disk are limited resources, the Broker cannot hold all data forever, so some data that has expired is periodically deleted. RocketMQ removes additional data files by setting data expiration times.

What kind of files can be deleted?

If a non-current file is not updated again within a certain period of time, the file is considered expired and can be deleted.

RocketMQ does not care if all messages on this file are consumed. The default expiration time of each file is 72 hours.

 // The number of hours to keep a log file before deleting it (in hours)
    @ImportantField
    private int fileReservedTime = 72;
Copy the code

Change the expiration time in hours by setting fileReservedTime in the Broker configuration file

brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole =  ASYNC_MASTER flushDiskType = ASYNC_FLUSHCopy the code

The overall flow of a delete is that a scheduled task is started in DefaultMessageStore to perform the delete:

The timing period is 10 seconds and is executed every 10 seconds. You can modify the parameter Settings.

// Resource reclaim interval
//private int cleanResourceInterval = 10000;

 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run(a) {
                DefaultMessageStore.this.cleanFilesPeriodically(); }},1000 * 60.this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);

Copy the code

The logic goes like this:

 private void deleteExpiredFiles(a) {

    int deleteCount = 0;
    long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
    int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
    int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();

    boolean timeup = this.isTimeToDelete();
    boolean spacefull = this.isSpaceToDelete();
    boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;

    if (timeup || spacefull || manualDelete) {

        if (manualDelete)
            this.manualDeleteFileSeveralTimes--;

        boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;

        log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",
            fileReservedTime,
            timeup,
            spacefull,
            manualDeleteFileSeveralTimes,
            cleanAtOnce);

        fileReservedTime *= 60 * 60 * 1000;

        deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
            destroyMapedFileIntervalForcibly, cleanAtOnce);
        if (deleteCount > 0) {}else if (spacefull) {
            log.warn("disk space will be full soon, but delete file failed."); }}}Copy the code

As you can see, the delete operation is performed when one of the following three conditions is met:

  1. The current time is equal to the configured time for deleting files. The default time is 4am

    // When to delete,default is at 4 am
     @ImportantField
     private String deleteWhen = "04";
    Copy the code
  2. The disk usage exceeds 85%

    private final double diskSpaceCleanForciblyRatio =
                Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio"."0.85"));
    Copy the code
  3. Manual deletion, reserved, can be manually triggered by calling the excuteDeleteFilesManualy method. RocketMQ does not currently package commands for manually triggered file deletion.

The data structure

Look directly at the logical data structure of CommitLog storage (code from the CommitLog class) :

protected PutMessageResult encode(MessageExtBrokerInner msgInner) {
    /** * Serialize message */
    final byte[] propertiesData =
            msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);

    final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;

    if (propertiesLength > Short.MAX_VALUE) {
        log.warn("putMessage message properties length too long. length={}", propertiesData.length);
        return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
    }

    final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
    final int topicLength = topicData.length;

    final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;

    final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);

    // Exceeds the maximum message
    if (msgLen > this.maxMessageSize) {
        CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
                + ", maxMessageSize: " + this.maxMessageSize);
        return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
    }

    // Initialization of storage space
    this.resetByteBuffer(encoderBuffer, msgLen);
    // 1 TOTALSIZE
    this.encoderBuffer.putInt(msgLen);
    // 2 MAGICCODE
    this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE);
    // 3 BODYCRC
    this.encoderBuffer.putInt(msgInner.getBodyCRC());
    // 4 QUEUEID
    this.encoderBuffer.putInt(msgInner.getQueueId());
    // 5 FLAG
    this.encoderBuffer.putInt(msgInner.getFlag());
    // 6 QUEUEOFFSET, need update later
    this.encoderBuffer.putLong(0);
    // 7 PHYSICALOFFSET, need update later
    this.encoderBuffer.putLong(0);
    // 8 SYSFLAG
    this.encoderBuffer.putInt(msgInner.getSysFlag());
    // 9 BORNTIMESTAMP
    this.encoderBuffer.putLong(msgInner.getBornTimestamp());
    // 10 BORNHOST
    socketAddress2ByteBuffer(msgInner.getBornHost() ,this.encoderBuffer);
    // 11 STORETIMESTAMP
    this.encoderBuffer.putLong(msgInner.getStoreTimestamp());
    // 12 STOREHOSTADDRESS
    socketAddress2ByteBuffer(msgInner.getStoreHost() ,this.encoderBuffer);
    // 13 RECONSUMETIMES
    this.encoderBuffer.putInt(msgInner.getReconsumeTimes());
    // 14 Prepared Transaction Offset
    this.encoderBuffer.putLong(msgInner.getPreparedTransactionOffset());
    // 15 BODY
    this.encoderBuffer.putInt(bodyLength);
    if (bodyLength > 0)
        this.encoderBuffer.put(msgInner.getBody());
    // 16 TOPIC
    this.encoderBuffer.put((byte) topicLength);
    this.encoderBuffer.put(topicData);
    // 17 PROPERTIES
    this.encoderBuffer.putShort((short) propertiesLength);
    if (propertiesLength > 0)
        this.encoderBuffer.put(propertiesData);

    encoderBuffer.flip();
    return null;
}
Copy the code

Combined with the above table, a message is stored as follows:

Storing all messages together is all of CommitLog, as follows:

Note that the above is an abstract structure, and the implementation is still on the commitLog

  • MappedFile
  • MappedFileQueue
public class CommitLog {
    // Message's MAGIC CODE daa320a7
    public final static int MESSAGE_MAGIC_CODE = -626843481;
    protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    // End of file empty MAGIC CODE cbd43194
    protected final static int BLANK_MAGIC_CODE = -875286124;
    protected final MappedFileQueue mappedFileQueue;
    protected final DefaultMessageStore defaultMessageStore;
  
Copy the code
public class MappedFileQueue {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);

    private static final int DELETE_FILES_BATCH_MAX = 10;

    private final String storePath;

    protected final int mappedFileSize;

    protected final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();

Copy the code

CommitLog MappedFileQueue MappedFile Relationships are as follows:

Mappedfiles correspond to physical files one by one.

This can be seen in the load method of MappedFileQueue:

public boolean load(a) {

    File dir = new File(this.storePath);
    File[] ls = dir.listFiles();
    if(ls ! =null) {
        return doLoad(Arrays.asList(ls));
    }
    return true;
}

public boolean doLoad(List<File> files) {
    // ascending order
    files.sort(Comparator.comparing(File::getName));

    for (File file : files) {
        if(file.length() ! =this.mappedFileSize) {
            log.warn(file + "\t" + file.length()
                    + " length not matched message store config value, ignore it");
            return true;
        }

        try {
            MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);

            mappedFile.setWrotePosition(this.mappedFileSize);
            mappedFile.setFlushedPosition(this.mappedFileSize);
            mappedFile.setCommittedPosition(this.mappedFileSize);
            this.mappedFiles.add(mappedFile);
            log.info("load " + file.getPath() + " OK");
        } catch (IOException e) {
            log.error("load file " + file + " error", e);
            return false; }}return true;
}
Copy the code

ConsumeQueue

As mentioned above, RocketMQ stores messages for all topics in CommitLog, which provides better performance because they are written sequentially. What happens when messages are queried or read? This structure is efficient for storage, but if it seems inconvenient to read messages, what does RocketMQ do?

If you have a commitLog file, you can read it directly:

 public static ByteBuffer read(String path) throws Exception {
        File file = new File(path);
        FileInputStream fin = new FileInputStream(file);
        byte[] bytes = new byte[(int) file.length()];
        fin.read(bytes);
        ByteBuffer buffer = ByteBuffer.wrap(bytes);
        return buffer;
    }

    public static void main(String[] args) throws Exception {
        String filePath = "/Users/xiaohezi/store/commitlog/00000000000000000000";
        ByteBuffer buffer = read(filePath);
        List<MessageExt> messageList = new ArrayList<>();
        while (true) {
            MessageExt decodeMsgs = MessageDecoder.decode(buffer);
            if (decodeMsgs == null) {
                break;
            }
            messageList.add(decodeMsgs);
        }
        for (MessageExt ms : messageList) {
            System.out.println("Subject:" + ms.getTopic() + "News:" +
                    new String(ms.getBody()) + "The queue ID." + ms.getQueueId() + "Storage address:"+ ms.getStoreHost()); }}Copy the code

So how does RocketMQ efficiently retrieve messages?

To clarify this, let’s look at a basic concept

MessageQueue

To get a sense of it, here’s a picture:

MessageQueue is literally “MessageQueue”, but it is the same thing as “shard” and “partition”. RocketMQ partitions, sharding, and queues are actually corresponding to messageQueue.

For example, if we have 100 items of data in our Topic, the default Topic is 4 queues, then there are about 25 items of data in each queue. These MessageQueue are then bound to the Broker, meaning that each MessageQueue may be on a different Broker machine, depending on your number of queues and your Broker cluster.

Since MessageQueue is multiple, when a message is sent, a queue must be selected in some way. By default, polling is used to fetch a message queue.

The application at the time of message sending is described in the official documentation cited below:

When sending a message, the Producer will first find the TopicPublishInfo specified by Topic. After obtaining the TopicPublishInfo route information, RocketMQ clients use selectOneMessageQueue() by default

The TopicPublishInfo messageQueueList method selects a queue (MessageQueue) from the messageQueueList in TopicPublishInfo to send the message. The specific fault-tolerant policies are defined in the MQFaultStrategy class. There is a sendLatencyFaultEnable switch variable that, if enabled, filters out brokers that are not available based on random incremental modulo. A “latencyFaultTolerance” is a fixed amount of time to back off from previous failures. For example, if the latency of the last request exceeds 550Lms, back away from 3000Lms; More than 1000L, retreat 60000L; If it is off, a queue (MessageQueue) is selected to send messages by means of random incremental modulo. LatencyFaultTolerance is the key to achieve high availability of message sending.

public MessageQueue selectOneMessageQueue(a) {
    int index = this.sendWhichQueue.incrementAndGet();
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0)
        pos = 0;
    return this.messageQueueList.get(pos);
}
Copy the code

consumeQueue

ConsumeQueue (consumeQueue) consumeQueue (consumeQueue)

The queue ID is named after the MessageQueue queue ID.

The name of each cosumeQueue is fileName, with a length of 20 characters, and the left side of the name is filled with zeros. The rest is the starting offset. For example, 00000000000000000000 indicates the first file with a start offset of 0 and a file size of 600W. When the first file is full, the name of the second file is 00000000000006000000 and the start offset of 6000000, and so on. The third file name is 00000000000012000000 and the start offset is 12000000. When the file is full, the message is written to the next file.

No messages are stored in the ConsumeQueue of RocketMQ. The messages are stored in the CommitLog. The ConsumeQueue stores only the offset of messages routed to the queue in the CommitLog. The size of the message and the tag code to which the message belongs are only 20 bytes:

We can print the contents of the ConsumerQueue file in this format:

 public static void main(String[] args) throws Exception {

    String path = "/Users/root/store/consumequeue/TopicTest/0/00000000000000000000";
    ByteBuffer buffer = read(path);
    while (true) {long offset = buffer.getLong();
        long size = buffer.getInt();
        long code = buffer.getLong();
        if (size==0) {break;
        }
         System.out.println("Message length:"+size+"Message offset:" +offset+" tag hashcode:"+code);
    }
    System.out.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -");

}
Copy the code
Message length: 201 Message offset: 201 Tag HashCode :2598919 Message length: 201 Message offset: 1005 Tag HashCode :2598919 Message length: 201 Message offset: 1005 1809 tag hashcode:2598919 ...Copy the code

In the output above, the difference of the message offset is equal to = message length * queue length, as in this case

804(1005-201) = 201 * 4

Why is that? Because the initial offset of each queue is different, I take my local 4 queues (0-3), each queue has only one file (00000000000000000000) as an example, then the initial offset of each file is:

  • Queue 0 has offset 201
  • The offset of queue 1 is 402
  • Queue 2 has offset 603
  • Queue 3 has offset 0

When a message is read, the ConsumeQueue is read first and then the CommitLog is read. How do I know which CommitLog messages are stored on? Take a look at the following two pieces of code from the CommitLog and MappedFileQueue classes:

 public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
        int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
        if(mappedFile ! =null) {
            int pos = (int) (offset % mappedFileSize);
            SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
            return result;
        }

        return null;
    }
Copy the code
/**
    * Finds a mapped file by offset.
    *
    * @param offset                Offset.
    * @param returnFirstOnNotFound If the mapped file is not found, then return the first one.
    * @return Mapped file or null (when not found and returnFirstOnNotFound is <code>false</code>).
    */
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
    try {
        MappedFile firstMappedFile = this.getFirstMappedFile();
        MappedFile lastMappedFile = this.getLastMappedFile();
        if(firstMappedFile ! =null&& lastMappedFile ! =null) {
            if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
                LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
                        offset,
                        firstMappedFile.getFileFromOffset(),
                        lastMappedFile.getFileFromOffset() + this.mappedFileSize,
                        this.mappedFileSize,
                        this.mappedFiles.size());
            } else {
                int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
                MappedFile targetFile = null;
                try {
                    targetFile = this.mappedFiles.get(index);
                } catch (Exception ignored) {
                }

                if(targetFile ! =null && offset >= targetFile.getFileFromOffset()
                        && offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
                    return targetFile;
                }

                for (MappedFile tmpMappedFile : this.mappedFiles) {
                    if (offset >= tmpMappedFile.getFileFromOffset()
                            && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
                        returntmpMappedFile; }}}if (returnFirstOnNotFound) {
                returnfirstMappedFile; }}}catch (Exception e) {
        log.error("findMappedFileByOffset Exception", e);
    }

    return null;
}

Copy the code

Assume that 1073742827 is a physical offset (that is, a global offset), and its relative offset is 1003 (1003 = 1073742827-1073741824). The offset is on the second CommitLog.

According to the code above, we lock the commitLog file based on the offset provided by consumeQueue from the list of commitLog files, and then calculate position based on the offset to find the corresponding message.

 public SelectMappedBufferResult selectMappedBuffer(int pos) {
    int readPosition = getReadPosition();
    if (pos < readPosition && pos >= 0) {
        if (this.hold()) {
            ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
            byteBuffer.position(pos);
            int size = readPosition - pos;
            ByteBuffer byteBufferNew = byteBuffer.slice();
            byteBufferNew.limit(size);
            return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this); }}return null;
}

Copy the code

In addition to finding the corresponding message through offset, we also need to pass throughmessage IDLookup.

The principle is the same, except that the offset is first resolved by message ID:

public static MessageId decodeMessageId(final String msgId) throws UnknownHostException {
    SocketAddress address;
    long offset;
    int ipLength = msgId.length() == 32 ? 4 * 2 : 16 * 2;

    byte[] ip = UtilAll.string2bytes(msgId.substring(0, ipLength));
    byte[] port = UtilAll.string2bytes(msgId.substring(ipLength, ipLength + 8));
    ByteBuffer bb = ByteBuffer.wrap(port);
    int portInt = bb.getInt(0);
    address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt);

    // offset
    byte[] data = UtilAll.string2bytes(msgId.substring(ipLength + 8, ipLength + 8 + 16));
    bb = ByteBuffer.wrap(data);
    offset = bb.getLong(0);

    return new MessageId(address, offset);
}
Copy the code

When was the consumeQueue file created and updated?

ConsumeQueue is a message consumption queue file. After messages reach the Commitlog file, they are asynchronously forwarded to the message consumption queue for message consumers.

RocketMQ uses the broker-side backend service thread, ReputMessageService, to continuously distribute requests and asynchronously build ConsumeQueue

indexFile

One of the query types in the figure above is through messageKey.

It is a model query based on Topic+Message Key. Speaking of queries, Ali Cloud has a recommended message query process:

What is MessageKey, and what does it do?

As the name implies, it is an identifier of the message, which can be set when the client sends the message. It is mainly used to distinguish the difference of each message in the business. For example, we generally set the value of the order ID and user ID to achieve the purpose of distinguishing data in the business, so as to facilitate the subsequent query.

@GetMapping("/produce")
public void produceMsg(a) {

    Map<String, Object> headers = Maps.newHashMapWithExpectedSize(16);
    headers.put(MessageConst.PROPERTY_TAGS, "test02");
    headers.put(MessageConst.PROPERTY_KEYS,"messageKey");

    Message message = MessageBuilder.createMessage("Hello RocketMQ!".new MessageHeaders(headers));
    output.send(message);
    System.out.println("Message sent" + message);

}
Copy the code

If we want to query messages based on messageKey, what does RocketMQ do?

RocketMQ introduces a Hash index mechanism to index messages, such as messageKey. IndexFile is a message IndexFile that stores the mapping between keys and offsets.

IndexFile provides a way to query messages by key or time interval. FileName is named after the timestamp when it is created. A fixed size of an IndexFile is 400 MB. An IndexFile can hold 2000W indexes.

RocketMQ’s index file logical structure is similar to the implementation of HashMap in the JDK. The specific structure of the index file is as follows:

The document consists of the following parts:

  • indexHeader
  • 500 w a hash slots
  • 2000w index entries

indexHeader

The header of an IndexFile, 40 bytes long. Contains the following fields

  • BeginTimestamp: This IndexFile contains the minimum storage time for messages.
  • EndTimestamp: The maximum storage time of messages in this IndexFile.
  • BeginPhyoffset: The minimum CommitLog file offset that contains messages in this IndexFile.
  • EndPhyoffset: The maximum CommitLog file offset that contains messages in this IndexFile.
  • HashSlotcount: The total number of hashslots contained in the IndexFile.
  • IndexCount: The number of Index entries used in the IndexFile.
public class IndexHeader {
    public static final int INDEX_HEADER_SIZE = 40;
    private static int beginTimestampIndex = 0;
    private static int endTimestampIndex = 8;
    private static int beginPhyoffsetIndex = 16;
    private static int endPhyoffsetIndex = 24;
    private static int hashSlotcountIndex = 32;
    private static int indexCountIndex = 36;
    private final ByteBuffer byteBuffer;
    private AtomicLong beginTimestamp = new AtomicLong(0);
    private AtomicLong endTimestamp = new AtomicLong(0);
    private AtomicLong beginPhyOffset = new AtomicLong(0);
    private AtomicLong endPhyOffset = new AtomicLong(0);
    private AtomicInteger hashSlotCount = new AtomicInteger(0);

    private AtomicInteger indexCount = new AtomicInteger(1);
Copy the code

slot table

The 4*500W Slot Table does not hold the actual index data, but the head of the one-way linked list corresponding to each Slot

The index data

20*2000W is true Index data, that is, an Index File can hold 2000W indexes.

How to index a message?

  • The hashcode is computed based on the key, modulo 500W, and we know which hash slot it is in. IndexHead takes up the first 40 bytes of the file. Then each hash slot takes four bytes. The location of the file is determined by the formula 40 + keyIndex*4.

  • The hash slot position of a message is determined by key. The hash slot position of a message is determined by the order in which the index entry is placed. This is called sequential write. Index entries first span the indexHead and 500W hash slots. It then puts the index entry into the corresponding position based on the current index entry. The calculation formula is as follows: 40-byte indexHead+ 500W * 4-byte hash slot size + current index value * 20 bytes

How to query index files?

RocketMQ uses the QueryMessageProcessor business processor on the Broker side to query messages by Message Key. The process of reading a message is to find a record in the IndexFile with a topic and key, and read the physical content of the message from the commitLog offset file.

If the body of a Message contains a Message Key or a Unique Key, then an index will be built for each of them. The process for index files to query messages based on keys is as follows:

  1. (slotNum is the maximum number of slots in an index file, for example slotNum= 500W)
  2. Find the last item in the index list based on slotValue(slotValue always points to the last item in the index list)
  3. Traversing the index list returns the result set within the query time range (default maximum of 32 records returned at one time)
 public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
    if (this.indexHeader.getIndexCount() < this.indexNum) {
        int keyHash = indexKeyHashMethod(key);
        int slotPos = keyHash % this.hashSlotNum;
        int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

        FileLock fileLock = null;

        try {

            // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
            // false);
            int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
            if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                slotValue = invalidIndex;
            }

            long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();

            timeDiff = timeDiff / 1000;

            if (this.indexHeader.getBeginTimestamp() <= 0) {
                timeDiff = 0;
            } else if (timeDiff > Integer.MAX_VALUE) {
                timeDiff = Integer.MAX_VALUE;
            } else if (timeDiff < 0) {
                timeDiff = 0;
            }

            int absIndexPos =
                    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                            + this.indexHeader.getIndexCount() * indexSize;

            this.mappedByteBuffer.putInt(absIndexPos, keyHash);
            this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);

            this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

            if (this.indexHeader.getIndexCount() <= 1) {
                this.indexHeader.setBeginPhyOffset(phyOffset);
                this.indexHeader.setBeginTimestamp(storeTimestamp);
            }

            if (invalidIndex == slotValue) {
                this.indexHeader.incHashSlotCount();
            }
            this.indexHeader.incIndexCount();
            this.indexHeader.setEndPhyOffset(phyOffset);
            this.indexHeader.setEndTimestamp(storeTimestamp);

            return true;
        } catch (Exception e) {
            log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
        } finally {
            if(fileLock ! =null) {
                try {
                    fileLock.release();
                } catch (IOException e) {
                    log.error("Failed to release the lock", e); }}}}else {
        log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
                + "; index max num = " + this.indexNum);
    }

    return false;
}
Copy the code

The hashcode is calculated based on the key, and modulo 500W to determine which hash slot it is in. Obtain the index entry by calculating the index entry position based on the slot value, and then obtain the index entry of the last hash conflicting node.

conclusion

RocketMQ file storage model hierarchy

RocketMQ stores Commitlog files, ConsumeQueue files, and Index files.

Message storage is done by ConsumeQueue and CommitLog. CommitLog The file that stores the real content of the message. They all have their own generation rules, storage paths, data structures. There are also Java data structures mapped with them, such as MappedFile, MappedByteBuffer, MappedFileQueue, etc.

RocketMQ uses a hybrid storage structure, with all queues in a single instance of the Broker sharing a single log file (CommitLog) for storage. RocketMQ’s hybrid storage structure (message entities for multiple topics are stored in a CommitLog) uses separate data and index parts for producers and consumers, respectively. The Producer sends the message to the Broker, which uses either synchronous or asynchronous methods to flush and persist the message and store it to CommitLog. Messages sent by the Producer are not lost as long as the messages are flushed and persisted to a CommitLog file. Because of this, consumers certainly have a chance to consume this message. If a pull request fails to pull a message, it can wait for the next pull. The server also supports long polling mode. If a pull request fails to pull a message, the Broker allows 30 seconds to wait. Here, RocketMQ uses a broker-side backend service thread, ReputMessageService, to continuously distribute requests and asynchronously build ConsumeQueue and IndexFile data.

The ConsumeQueue acts as an index to the consuming messages, storing the CommitLog’s starting physical offset, message size, and HashCode value of the message Tag. IndexFile, on the other hand, simply provides a way to query messages by key or time interval for message query purposes.

Finally, take a look at the process with the production, consumption, and storage of messages:

reference

  • Github.com/apache/rock…
  • Github.com/apache/rock…
  • Blog.csdn.net/prestigedin…
  • www.cnblogs.com/zuoyang/p/1…
  • Fdx321. Making. IO / 2017/08/22 /…
  • Juejin. Cn/post / 684490…
  • Help.aliyun.com/document_de…
  • Cloud.tencent.com/developer/a…
  • blog.pkspace.cn/article/14
  • Blog.csdn.net/wengfuying5…