The following source code is based on Rocket MQ 4.7.0

How to recover data if RocketMQ is restarted during normal or abnormal exit. The next step is to examine the process in code.

Broker Fault recovery

This code is called when the broker is first started or restarted:

/ / the BrokerControllerinitialize method
public boolean initialize(a) throws CloneNotSupportedException {
     result = result && this.messageStore.load();
}
Copy the code

As you can see from the above code, when the broker is initiated, the MessageStore#load method is implemented as DefaultMessageStore by default. Now look at the load method, which is the entrance to Broker recovery:

    public boolean load(a) {
        boolean result = true;

        try {
            // Abort exits correctly by checking whether the abort file exists
            boolean lastExitOK = !this.isTempFileExist();

            if (null! = scheduleMessageService) { result = result &&this.scheduleMessageService.load();
            }

            / / load CommitLog
            result = result && this.commitLog.load();

            / / load ConsumeQueue
            result = result && this.loadConsumeQueue();

            if (result) {
                this.storeCheckpoint =
                    new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));

                this.indexService.load(lastExitOK);
                // Restore the entry
                this.recover(lastExitOK);

                this.getMaxPhyOffset()); }}catch (Exception e) {
            result = false;
        }

        if(! result) {this.allocateMappedFileService.shutdown();
        }

        return result;
    }
Copy the code

You can see from the above code that recovery is handled by the Recover method.

    private void recover(final boolean lastExitOK) {
        // Get the maximum physical offset of ConsumeQueue -- this is also the physical offset of CommitLog (test print code later)
        long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();

        if (lastExitOK) {
            // Exit normally
            this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
        } else {
            // Abnormal exit processing
            this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
        }

        this.recoverTopicQueueTable();
    }
Copy the code

As of Version 4.7.0 of RocketMQ, the CommitLog#recoverAbnormally method shows up as expired, which will not be analyzed here. And we’ll see what happens here.

Restore CommitLog and ConsumeQueue

Here to explain by means of adding test code maxPhyOffsetOfConsumeQueue what value. You can first add the following code to Recover and then package the source code:

Then start the broker, which I started with a value of 384

The client then generates a message to the Broker

By monitoring broker logs (which I also added myself), the CommitLog size is 192 bytes.

And then restart the Broker found this maxPhyOffsetOfConsumeQueue into 576.

MaxPhyOffsetOfConsumeQueue is illustrated by the log print for CommitLog log physical offset. CommitLog#recoverNormally:

public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
        //CRC checks on recovery -- default true
        boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
    // Get the CommitLog list
    final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
    	// Non-empty indicates not the first startup
        if(! mappedFiles.isEmpty()) {// If there are more than three CommitLog files, start with the latest three. If there are less than three files, check as many files as possible
            int index = mappedFiles.size() - 3;
            if (index < 0)
                index = 0;
			
            MappedFile mappedFile = mappedFiles.get(index);
            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
            long processOffset = mappedFile.getFileFromOffset();
            long mappedFileOffset = 0;
            while (true) {
                // Check each piece of data and return DispatchRequest
                DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
                int size = dispatchRequest.getMsgSize();
                // Normal data processing
                if (dispatchRequest.isSuccess() && size > 0) {
                    mappedFileOffset += size;
                }
                
                // The file ends or is processed or needs to be changed
                else if (dispatchRequest.isSuccess() && size == 0) {
                    index++;
                    // The latest three files have been processed
                    if (index >= mappedFiles.size()) {
                 
                        break;
                    } else {
                        // Switch files
                        mappedFile = mappedFiles.get(index);
                        byteBuffer = mappedFile.sliceByteBuffer();
                        processOffset = mappedFile.getFileFromOffset();
                        mappedFileOffset = 0; }}// Terminal file read due to error
                else if(! dispatchRequest.isSuccess()) { log.info("recover physics file end, " + mappedFile.getFileName());
                    break;
                }
            }

            processOffset += mappedFileOffset;
            // Set the refresh location
            this.mappedFileQueue.setFlushedWhere(processOffset);
            // Set the submission location for the next file
            this.mappedFileQueue.setCommittedWhere(processOffset);
            // Delete expired files
            this.mappedFileQueue.truncateDirtyFiles(processOffset);

            // Clear unnecessary data from ConsumeQueue
            if (maxPhyOffsetOfConsumeQueue >= processOffset) {
                this.defaultMessageStore.truncateDirtyLogicFiles(processOffset); }}else {
            // Delete all CommitLog files (special case is the first boot)
            this.mappedFileQueue.setFlushedWhere(0);
            this.mappedFileQueue.setCommittedWhere(0);
            this.defaultMessageStore.destroyLogics(); }}Copy the code

The following code has two variables: processOffset and mappedFileOffset. Let’s look at them by adding the log print mode. First, add the code as shown in the following image and then package the corresponding module:

Then start the Broker and look at the corresponding values as shown below:

The processOffset starts with 0, and the mappedFileOffset is the data processed by each CommitLog.

There are two kinds of normal recovery:

  • CommitLog log files exist

    1. Check each of the last three file data
    2. Set flushedWhere and committedWhere values
    3. Delete the CommitLog log file that has been processed.
  • No CommitLog log file exists (first startup or log file deleted)

    Set flushedWhere and committedWhere to 0 and delete the ConsumeQueue file

The recovery of TopicQueue

// Topic queueId and offset
public void recoverTopicQueueTable(a) {
        HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);
        long minPhyOffset = this.commitLog.getMinOffset();
        for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
            for (ConsumeQueue logic : maps.values()) {
                String key = logic.getTopic() + "-"+ logic.getQueueId(); table.put(key, logic.getMaxOffsetInQueue()); logic.correctMinOffset(minPhyOffset); }}this.commitLog.setTopicQueueTable(table);
    }
Copy the code

The relational data in this is used in the CommitLog data stored in the following code. DefaultAppendMessageCallback# doAppend method.

Long queueOffset = CommitLog.this.topicQueueTable.get(key);

ueueOffset++;

CommitLog.this.topicQueueTable.put(key, queueOffset);
Copy the code