Commitlog load

After we restart the broker, how does RocketMQ reload the commitlog file on disk again? With that in mind, let’s take a look at the commitlog loading process

The commitlog loading process is as follows:

We focus on the first and second processes here. The third process, after the ConsumeQueue, will be highlighted by IndexFile.

Take the third process a little further. The logic of the third process is to store commitlog to consumeQueue. Commitlogs are messages that are not forwarded and stored in the consumeQueue

DefaultMessageStore initialization

In the world of rocketMQ, we know that commitlog is abstracted as the Commitlog class. Each file is abstracted as an MappedFile. But there is a key class DefaultMessageStore that coordinates the loading of all files.

This class initializes the CommitLog with it

public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
        final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
        
        // ...
        // Initialize commitLog
        if (messageStoreConfig.isEnableDLegerCommitLog()) {
            this.commitLog = new DLedgerCommitLog(this);
        } else {
            this.commitLog = new CommitLog(this);
        }
        // ... 
    }
Copy the code

DefaultMessageStore initialization

In RocketMQ, commitlog is abstracted as the Commitlog class. Each file is abstracted as an MappedFile. DefaultMessageStore acts as an ‘aggregate class’ that coordinates the loading of all files.

This class initializes the CommitLog with it

public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
        final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
        
        // ...
        // Initialize commitLog
        if (messageStoreConfig.isEnableDLegerCommitLog()) {
            this.commitLog = new DLedgerCommitLog(this);
        } else {
            this.commitLog = new CommitLog(this);
        }
        // ... 
    }
Copy the code

DefaultMessageStore#load() loads files on disk

The laod() method loads related files on disk, such as commitlog, consumeQueue, and indexFile, into memory.

DefaultMessageStore, when executing the load() method, delegates the CommitLog to load the CommitLog into memory. The commitlog #load() method loads the COMMITlog file from the storage directory into memory

public class DefaultMessageStore implements MessageStore {
    public boolean load(a) {
        boolean result = true;

        try {
            //todo If abort exists, it indicates that the current exit is abnormal
            boolean lastExitOK = !this.isTempFileExist();
            log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");

            if (null! = scheduleMessageService) { result = result &&this.scheduleMessageService.load();
            }

            // Commission CommitLog to load CommitLog
            result = result && this.commitLog.load();

            // load Consume Queue
            result = result && this.loadConsumeQueue();

            if (result) {
                this.storeCheckpoint =
                    new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));

                this.indexService.load(lastExitOK);

                // todo Resumes CommitLog based on whether the CommitLog exits normally
                this.recover(lastExitOK);

                log.info("load over, and the max phy offset = {}".this.getMaxPhyOffset()); }}catch (Exception e) {
            log.error("load exception", e);
            result = false;
        }

        if(! result) {this.allocateMappedFileService.shutdown();
        }

        returnresult; }}Copy the code

In this method, we notice that when the abort file is present, the broker is judged to exit abnormally. And after loading the CommitLog file into memory via CommitLog. An recover() is also performed to recover the commitlog information.

Why do YOU need to restore commitlog? The reason is that messages stored in commitlog files may not be correct. When the broker exits abnormally, a message may be saved in half. The recover() method is used to strip out illegal messages.

public class DefaultMessageStore implements MessageStore {
    private void recover(final boolean lastExitOK) {
        // Todo restores consumeQueue to get the maximum offset of commitlogs stored in consumeQueue
        long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();

        // The difference between todo's abnormal exit and normal exit is:
        / / todo abnormal exit, according to maxPhyOffsetOfConsumeQueue find which file is damaged, and began to recover from the file
        // Todo exits normally, and the recovery starts directly from the last 3 files
        if (lastExitOK) {
            this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
        } else {
            this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
        }

        this.recoverTopicQueueTable(); }}Copy the code