1. An overview of the

1.1 What is an index file

Indexfiles, also known as index files, are files that RocketMQ keeps on disk as part of the RocketMQ store. Its structure is similar to that of HashMap in the JDK.

You can enable or disable the IndexFile storage function using the messageIndexEnable attribute.

1.2 Functions of index Files

Index files are used in limited scenarios to provide the ability to query messages by Message Key. The index file can use the Message Key to find the physical offset of a Message in the CommitLog, and then search for messages from the CommitLog.

2. Outline design

2.1 Index file structure

A HashMap stores data in key-value format, so index files are stored in key-value format

  • Key: Message Key. The Key of the index file is actually an Integer from the hash of the Message Key,
  • Value: physical offset. The Value of the index file is primarily the absolute physical offset of the message in the CommitLog.

In hash conflicts, values are stored in a linked list, with newer messages coming first in the list.

It can contain multiple files, each of which is of a fixed size. This means that each IndexFile contains the same maximum number of indexes.

2.2 How to Build

After messages are saved to CommitLog, they are redelivered. The process of reposting messages is to create index files (including ConsumeQueue and IndexFile) for messages.

The repost thread scans for new messages to the CommitLog, finds the message, executes the repost logic, and builds an index of the message.

2.3 How Do I Query Messages

The index file stores the offset of the Message Key in the CommitLog. The offset information is first queried, and the Message is then queried from the CommitLog with the offset.

2.4 Disk flushing mechanism

The index file flushing mechanism does not adopt the periodic flushing mechanism. Instead, every time an index file is filled, a new file is created and the last full index file is flushed.

3. Detailed design

3.1 Index File Structure

The most important aspect of designing IndexFile is to design its logical structure and file storage structure. First take a look at the detailed logical structure of IndexFile

3.1.1 Logical Structure

As mentioned above, IndexFile is a JDK-like structure for HashMap.

  • Key: Build from IndexService#buildKey(String Topic, String Key) Topic + “#” + messageKey hash (IndexFile#indexKeyHashMethod)

    Note: There is a possibility of Hash conflict, where two messages with different topics and keys may get the same Hash value, resulting in incorrect query results. The community has raised this error ISSUE#3613, but it has not yet been resolved.

  • Value: Hash becomes a linked list structure when the conflict occurs, containing:

    • The physical offset of the message in the CommitLog, used to query the message in the CommitLog
    • IndexFile#indexKeyHashMethod(String Key)The resulting integer Hash value
    • The difference between the message retention time and the earliest message retention time of an index file, used to search for messages in the time range
    • Pointer to the location of the next message (previous in time, the later the message arrives earlier in the list)

3.1.2 Storage Structure

The underlying index file is stored using RocketMQ’s MappedFile. There can be multiple index files that can be expanded indefinitely.

Each index file is named after the time it was created, for example, 20211209174133951

Each index file is designed to be of fixed length and can hold up to 5 million Hash slots and 20 million index entries. When the number of saved data exceeds the limit, a new index file is created to save it. This means that messages with the same Hash value may be stored in different index files.

RocketMQ’s storage files follow a common practice for defining data storage formats: Header + Body. Usually the Header part is fixed length and holds some basic information, while the Body holds data.

The specific storage structure and content are shown in the figure below:

  • HeaderFixed size, containing some basic information
    • BeginTimestamp: earliest time the message was stored (the time the message was stored to the CommitLog)
    • EndTimestamp: indicates the last time the message was stored
    • BeginPhyoffset: the minimum physical offset (offset in CommitLog) of the stored message
    • EndPhyoffset: the maximum physical offset of the stored message
    • HashSlotCount: indicates the maximum number of hash slots that can be stored
    • IndexCount: Indicates the number of index entries in use. Notice that this value starts at 1
  • Hash SlotSome stores a fixed number of Message Key hash slots (5 million, which can be configured through Broker items)maxHashSlotNumTo configure)
    • Each value stored is a logical subscript of the index in the index file. Because the Header and Hash Slot portions of the index file are fixed in length, as is the length of each index, the absolute offset of the index entry in the index file can be calculated using logical subscripts
  • Index ItemSome stores a fixed number of index entries (20 million, which can be configured through the BrokermaxIndexNumTo configure. Each index entry contains the following information
    • Key Hash: the Hash of the Topic of the Message and the integer of the Message Key
    • CommitLog Offset: Physical Offset of messages in the CommitLog, used to query messages in the CommitLog
    • Time Diff: The Time difference (accurate to seconds) between this index file and the Time when the message was saved, used to query messages based on the Time range
    • Next Index Offset: the logical subscript of the Next item in the list (this has the same meaning as the logical subscript stored in Hash Slot)
      • Each time a new message is inserted, it is inserted from the head of the list. The further down the list, the older the message. Because message queues generally care more about new messages.

3.2 Classes involved in index files

IndexService

Index service for managing and controlling all index files. Index files can be loaded, created, flushed, and deleted. Is the entry point for indexing file operations.

  • private final ArrayList<IndexFile> indexFileList: Index file list.
  • buildIndex(DispatchRequest req): Builds indexes based on message distribution requests. Note that the msgId index and the message Key index are created here
    1. Create or get the latest index file
    2. To call the index fileIndexFile#putKeyMethod create index
  • queryOffset(String topic, String key, int maxNum, long begin, long end): Finds messages from IndexFile based on topic and message key. Query by time: Queries the messages saved from begin to end
    1. Go backwards and forwardsindexFileListIndexFile (where [beginTimestamp, endTimestamp] intersects with [begin, end])
    2. Invoke the index file that meets the criteriaIndexFile#selectPhyOffset()Method to find all offsets
  • retryGetAndCreateIndexFile(): gets the latest index file, or creates a new one if it does not exist.
    • callgetAndCreateLastIndexFile()Method to create or get the latest index file
    • If a new index file fails to be created, the system tries to create it three times
  • getAndCreateLastIndexFile(): Gets the last index file. If there are no index files or the last index file is full, create a new file
    1. Check if the last file in the index file list exists and is full
    2. If it does not exist or is already full, create a new file and flush the previous index file asynchronously
    3. If the last file exists and is not full, return it directly
  • flush(): Forcibly flush an index file and refresh indexMsgTimestamp in the checkpoint file. The checkpoint file is flushed.
    • If openMessageStoreConfig#messageIndexSafeSo that the next time the Broker fails to recover, the indexMsgTimestamp saved by checkpoint is the forced flush time recorded in the index file.
    • This parameter is called when a new index file is created after an index file is used up, forcing the file to be flushed

IndexFile

Index file, which contains the storage structure of the index file and a series of operations.

The underlying storage uses the memory-mapped file MappedFile.

  • MappedFile mappedFile: Underlying storage implementation
  • putKey(final String key, final long phyOffset, final long storeTimestamp): Adds an index to the index file
  • selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum, final long begin, final long end, boolean lock): Searches for the offset corresponding to the index based on the Key in the index file

4. Source code analysis

4.1 IndexService

4.4.1 create

  • Load: reloads index files
/** * reload index file **@paramDid lastExitOK exit normally last time@returnWhether the load is successful */
public boolean load(final boolean lastExitOK) {
    File dir = new File(this.storePath);
    File[] files = dir.listFiles();
    if(files ! =null) {
        // Ascending order sorts index files in ascending order by creation time
        Arrays.sort(files);
        for (File file : files) {
            // Load each index file in turn
            try {
                IndexFile f = new IndexFile(file.getPath(), this.hashSlotNum, this.indexNum, 0.0);
                f.load();
                // Delete all index files after the check point
                if(! lastExitOK) {if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint().getIndexMsgTimestamp()) {
                        f.destroy(0);
                        continue; }}this.indexFileList.add(f);
            } catch 
              // ...}}return true;
}
Copy the code

  • GetAndCreateLastIndexFile () : to obtain the final index file, if the collection is empty or the final with a file, create a new file
    1. Check whether the file exists and is full
    2. If it does not exist or the last file is full, a file is created
    3. If so, return the file directly
    4. If a new file is created, a thread is started to asynchronously flush the previous full file.
      • The flush thread will flush the file
      • And then updateStoreCheckpoint#indexMsgTimestampEndTimestamp of the indexHeader in the overwritten index file
/** * get the last index file, if the collection is empty or the last file is full, create a new file 

* only one thread calls, so there is no write contention */
public IndexFile getAndCreateLastIndexFile(a) { IndexFile indexFile = null; IndexFile prevIndexFile = null; long lastUpdateEndPhyOffset = 0; long lastUpdateIndexTimestamp = 0; // Try the read lock first { this.readWriteLock.readLock().lock(); // Check whether the file list is empty if (!this.indexFileList.isEmpty()) { IndexFile tmp = this.indexFileList.get(this.indexFileList.size() - 1); // Check whether the last file is full if(! tmp.isWriteFull()) { indexFile = tmp; }else{ lastUpdateEndPhyOffset = tmp.getEndPhyOffset(); lastUpdateIndexTimestamp = tmp.getEndTimestamp(); prevIndexFile = tmp; }}this.readWriteLock.readLock().unlock(); } // If the file list is empty or the last file is full, use write lock to create the file if (indexFile == null) { try { String fileName = this.storePath + File.separator + UtilAll.timeMillisToHumanString(System.currentTimeMillis()); indexFile = new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset, lastUpdateIndexTimestamp); this.readWriteLock.writeLock().lock(); this.indexFileList.add(indexFile); } catch (Exception e) { log.error("getLastIndexFile exception ", e); } finally { this.readWriteLock.writeLock().unlock(); } // Every time a new file is created, the previous file is flushed asynchronously if(indexFile ! =null) { final IndexFile flushThisFile = prevIndexFile; Thread flushThread = new Thread(new Runnable() { @Override public void run(a) { IndexService.this.flush(flushThisFile); }},"FlushIndexFileThread"); flushThread.setDaemon(true); flushThread.start(); }}return indexFile; } Copy the code

4.1.2 Insert and Query

  • buildIndex(DispatchRequest req): Builds indexes based on message distribution requests. Note that the msgId index and the message Key index are created here
    1. Create or get the latest index file
    2. To call the index fileIndexFile#putKeyMethod create index
      1. Get the uniqKey (that is, msgId) and create the index
      2. Get all keys of the message and create indexes separately
/** * Build index from DispatchRequest **@paramReq messages are sent to the DispatchRequest */ file of the Index file after being stored in the CommitLog
public void buildIndex(DispatchRequest req) {
    IndexFile indexFile = retryGetAndCreateIndexFile();
    if(indexFile ! =null) {
        long endPhyOffset = indexFile.getEndPhyOffset();
        DispatchRequest msg = req;
        String topic = msg.getTopic();
        String keys = msg.getKeys();
        if (msg.getCommitLogOffset() < endPhyOffset) {
            return;
        }

        // If it is a rollback message of a transaction message, it does not need to create an index
        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        switch (tranType) {
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                break;
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                return;
        }

        if(req.getUniqKey() ! =null) {
            // create the UniqueKey index, that is, the msgId index
            indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
            if (indexFile == null) {
                log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                return; }}// Create an index of the message key, where there can be multiple keys
        if(keys ! =null && keys.length() > 0) {
            String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
            for (int i = 0; i < keyset.length; i++) {
                String key = keyset[i];
                if (key.length() > 0) {
                    indexFile = putKey(indexFile, msg, buildKey(topic, key));
                    if (indexFile == null) {
                        log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                        return;
                    }
                }
            }
        }
    } else {
        log.error("build index error, stop building index"); }}Copy the code
  • queryOffset(String topic, String key, int maxNum, long begin, long end): Finds messages from IndexFile based on topic and message key
/** * Find messages from IndexFile based on topic and message key **@param topic
     * @param key
     * @paramMaxNum Maximum number of lookup messages *@paramBegin Minimum time for searching messages *@paramEnd Maximum time to find a message *@return* /
    public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end) {
        List<Long> phyOffsets = new ArrayList<Long>(maxNum);

        long indexLastUpdateTimestamp = 0;
        long indexLastUpdatePhyoffset = 0;
        maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch());
        try {
            this.readWriteLock.readLock().lock();
            if (!this.indexFileList.isEmpty()) {
                // Traverses the IndexFile from back to front to find the IndexFile corresponding to the message corresponding to the time
                for (int i = this.indexFileList.size(); i > 0; i--) {
                    IndexFile f = this.indexFileList.get(i - 1);
                    boolean lastFile = i == this.indexFileList.size();
                    if (lastFile) {
                        indexLastUpdateTimestamp = f.getEndTimestamp();
                        indexLastUpdatePhyoffset = f.getEndPhyOffset();
                    }

                    if (f.isTimeMatched(begin, end)) {
                        // The last file needs to be locked
                        f.selectPhyOffset(phyOffsets, buildKey(topic, key), maxNum, begin, end, lastFile);
                    }

                    // The previous iteration is even more inconsistent
                    if (f.getBeginTimestamp() < begin) {
                        break;
                    }

                    if (phyOffsets.size() >= maxNum) {
                        break; }}}}catch (Exception e) {
            log.error("queryMsg exception", e);
        } finally {
            this.readWriteLock.readLock().unlock();
        }

        return new QueryOffsetResult(phyOffsets, indexLastUpdateTimestamp, indexLastUpdatePhyoffset);
    }
Copy the code

4.1.3 Expired Deletion

  • deleteExpiredFile(long offset): Deletes all IndexFiles before message CommitLog offset

4.1.4 brush set

  • flush(): Forcibly flush forcibly writes data in memory-mapped files to disks. Called after an index file is full
/** * index file flush, call ** when a file is full@paramF Index file to be flushed */
public void flush(final IndexFile f) {
    if (null == f)
        return;

    long indexMsgTimestamp = 0;

    if (f.isWriteFull()) {
        indexMsgTimestamp = f.getEndTimestamp();
    }

    // Index file is flushed
    f.flush();

    // checkpoint files flush
    if (indexMsgTimestamp > 0) {
        this.defaultMessageStore.getStoreCheckpoint().setIndexMsgTimestamp(indexMsgTimestamp);
        this.defaultMessageStore.getStoreCheckpoint().flush(); }}Copy the code

4.2 IndexFile

  • putKey(final String key, final long phyOffset, final long storeTimestamp): Inserts a new index entry into the index file
  1. Calculate the absolute Hash slot position based on the Hash value of the keyabsSlotPos
  2. Gets the value of the current hash slot, which is the logical index of the latest hash slot
  3. Insert the index in the linked list header corresponding to the Hash slot
  4. The hash slot points to the logical subscript of the newly created index
  5. Update file header
/** * Insert new index entry into index file * if false is returned, new index file needs to be created */
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
    // Check whether the current number of indexes is smaller than the maximum number of indexes. If the number is smaller than the maximum number of indexes, exit directly, indicating that a new index file needs to be created
    if (this.indexHeader.getIndexCount() < this.indexNum) {
        // Computes the hash value of the key
        int keyHash = indexKeyHashMethod(key);
        // Get the hash slot position (subscript). Hash again by keyHash % hashSlotNum, which increases the probability of query message error.
        int slotPos = keyHash % this.hashSlotNum;
        // Use the hash slot table to calculate the absolute hash slot position
        int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

        FileLock fileLock = null;

        try {

            // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
            // false);
            // Obtain the hash slot value based on the absolute hash slot position. If there is a value, the hash key already exists. If there is no value, you need to fill in the hash key
            int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
            if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                slotValue = invalidIndex;
            }

            long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();

            timeDiff = timeDiff / 1000;

            if (this.indexHeader.getBeginTimestamp() <= 0) {
                timeDiff = 0;
            } else if (timeDiff > Integer.MAX_VALUE) {
                timeDiff = Integer.MAX_VALUE;
            } else if (timeDiff < 0) {
                timeDiff = 0;
            }

            // Calculate the absolute offset to place the index
            int absIndexPos =
                IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                    + this.indexHeader.getIndexCount() * indexSize;

            // Insert the latest index entry at the head of the list
            // The index is stored in a file, and the last one is a pointer to the next list element
            this.mappedByteBuffer.putInt(absIndexPos, keyHash);
            this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);

            // Writes hash slots. The value of each hash slot is the logical subscript of the newly written index file
            this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

            if (this.indexHeader.getIndexCount() <= 1) {
                this.indexHeader.setBeginPhyOffset(phyOffset);
                this.indexHeader.setBeginTimestamp(storeTimestamp);
            }

            if (invalidIndex == slotValue) {
                this.indexHeader.incHashSlotCount();
            }
            // Update the index header with the number of index entries +1
            this.indexHeader.incIndexCount();
            this.indexHeader.setEndPhyOffset(phyOffset);
            this.indexHeader.setEndTimestamp(storeTimestamp);

            return true;
        } catch (Exception e) {
            log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
        } finally {
            if(fileLock ! =null) {
                try {
                    fileLock.release();
                } catch (IOException e) {
                    log.error("Failed to release the lock", e); }}}}else {
        log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
            + "; index max num = " + this.indexNum);
    }

    return false;
}
Copy the code

  • selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum, final long begin, final long end, boolean lock): Searches for the offset corresponding to the index based on the Key in the index file
    1. Calculate the absolute Hash slot position based on the Hash value of the key
    2. Find the absolute position of the indexed list by using the index logical subscript stored in the hash slot
    3. Traverse each index in the linked index list, get index data, compare time information
    4. Add indexes whose time information meets the search criteria to the result list
/** * Find offsets ** from the index file based on the key@paramPhyOffsets offsets result list *@paramKey Search key *@paramMaxNum Maximum number of returned results *@paramBegin Start time for searching messages *@paramEnd Indicates the end time of the search message *@paramLock Whether the search is locked (deprecated) */
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
    final long begin, final long end, boolean lock) {
    if (this.mappedFile.hold()) {
        // Calculate the absolute hash slot position based on the hash value of the key
        int keyHash = indexKeyHashMethod(key);
        int slotPos = keyHash % this.hashSlotNum;
        int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

        FileLock fileLock = null;
        try {
            if (lock) {
                // fileLock = this.fileChannel.lock(absSlotPos,
                // hashSlotSize, true);
            }

            // Get the hash slot value
            int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
            // if (fileLock ! = null) {
            // fileLock.release();
            // fileLock = null;
            // }
            // If the value of the hash slot is valid, the search fails
            if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
                || this.indexHeader.getIndexCount() <= 1) {}else {
                for (intnextIndexToRead = slotValue; ;) {if (phyOffsets.size() >= maxNum) {
                        break;
                    }

                    int absIndexPos =
                        IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                            + nextIndexToRead * indexSize;

                    int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
                    long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);

                    long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
                    int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);

                    if (timeDiff < 0) {
                        break;
                    }

                    timeDiff *= 1000L;

                    long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
                    boolean timeMatched = (timeRead >= begin) && (timeRead <= end);

                    if (keyHash == keyHashRead && timeMatched) {
                        phyOffsets.add(phyOffsetRead);
                    }

                    if (prevIndexRead <= invalidIndex
                        || prevIndexRead > this.indexHeader.getIndexCount()
                        || prevIndexRead == nextIndexToRead || timeRead < begin) {
                        break; } nextIndexToRead = prevIndexRead; }}}catch (Exception e) {
            log.error("selectPhyOffset exception ", e);
        } finally {
            if(fileLock ! =null) {
                try {
                    fileLock.release();
                } catch (IOException e) {
                    log.error("Failed to release the lock", e); }}this.mappedFile.release(); }}Copy the code

The resources

  • RocketMQ documentation: Design
  • RocketMQ storage – IndexFile and IndexService