The following source code is based on Rocket MQ 4.7.0

1 ConsumeQueue format

As analyzed earlier, each ConsumeQueue data is a fixed length of 20bytes, and each file stores 30W data. Storage is classified by topic and queueId. So what does ConsumeQueue do?

  • The first eight bytes hold the CommitLog offset, which tells you where the message’s CommitLog file is located
  • The middle four bytes hold the size of the message and randomly read the message based on the first eight bytes.
  • The last eight bytes handle filtering of consumer tags

We will only examine the ConsumeQueue file generation process here, and the rest will be covered in subsequent articles

2 ConsumeQueue Persistence process

ConsumeQueue is also one of RocketMQ’s three major persistence services, so through source analysis, ConsumeQueue is a ReputMessageService to provide persistence services, This class is an inner class in the ReputMessageService class:

class ReputMessageService extends ServiceThread {

        private volatile long reputFromOffset = 0;
	// omit the code
}
Copy the code

Inherits the RocketMQ custom ServiceThread ServiceThread. So a service is a thread. Take a look at the Run method:

        @Override
        public void run(a) {
            DefaultMessageStore.log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    Thread.sleep(1);
                    this.doReput();
                } catch (Exception e) {
                    DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }

            DefaultMessageStore.log.info(this.getServiceName() + " service end");
        }
Copy the code

Mainly by calling the doReput method:

        private void doReput(a) {
            // Get the location where the data is read -- the default starts at 0 and is set for systems that have been running for some time
			if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
                log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.".this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
            }
            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {

                if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
                    && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
                    break;
                }
				// Get the result of available data in CommitLog
                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
                if(result ! =null) {
                    try {
                        this.reputFromOffset = result.getStartOffset();

                        for (int readSize = 0; readSize < result.getSize() && doNext; ) {
							// Retrieve a CommitLog file
                            DispatchRequest dispatchRequest =
                                DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false.false);
									
                            int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
							
                            if (dispatchRequest.isSuccess()) {
                                if (size > 0) {
									// Call processing data to generate ConsumeQueue
                                    DefaultMessageStore.this.doDispatch(dispatchRequest);

									// Process the Broker Master
                                    if(BrokerRole.SLAVE ! = DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                                        && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
                                        DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                            dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                            dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                            dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                                    }
									// Push forward offset
                                    this.reputFromOffset += size;
                                    readSize += size;
                                    if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
                                        DefaultMessageStore.this.storeStatsService
                                            .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
                                        DefaultMessageStore.this.storeStatsService .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()) .addAndGet(dispatchRequest.getMsgSize()); }}else if (size == 0) {
									// Read the next file
                                    this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset); readSize = result.getSize(); }}else if(! dispatchRequest.isSuccess()) {if (size > 0) {
                                    log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
                                    this.reputFromOffset += size;
                                } else {
                                    doNext = false;
                                    // If user open the dledger pattern or the broker is master node,
                                    // it will not ignore the exception and fix the reputFromOffset variable
                                    if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||
                                        DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
                                        log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}".this.reputFromOffset);
                                        this.reputFromOffset += result.getSize() - readSize;
                                    }
                                }
                            }
                        }
                    } finally{ result.release(); }}else {
                    doNext = false; }}}Copy the code
  1. Read data that can be processed at the current time in CommitLog. (Data may be in different files)
  2. Cycle through the current batch of data and convert the data to DispatchRequest
  3. Through DefaultMessageStore doDispatch to distribute DispatchReques t data

Look at the below DefaultMessageStore doDispatch method is how to distribute data:

    public void doDispatch(DispatchRequest req) {
        for (CommitLogDispatcher dispatcher : this.dispatcherList) { dispatcher.dispatch(req); }}Copy the code

There is a CommitLogDispatcher interface that generates ConsumeQueue and IndexFile, respectively.

CommitLogDispatcher has three implementations in RocketMQ:

  1. The generation of ConsumeQueue CommitLogDispatcherBuildConsumeQueue – processing
  2. The generation of IndexFile CommitLogDispatcherBuildIndex – processing
  3. CommitLogDispatcherCalcBitMap – bit Map processing calculation

Look at the below CommitLogDispatcherBuildConsumeQueue implementation:

    class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

        @Override
        public void dispatch(DispatchRequest request) {
            final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
            switch (tranType) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    DefaultMessageStore.this.putMessagePositionInfo(request);
                    break;
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    break; }}}Copy the code

The two data types, TRANSACTION_PREPARED_TYPE and TRANSACTION_ROLLBACK_TYPE, are not processed here. Data processing as long as it is through DefaultMessageStore. This. PutMessagePositionInfo (request) this code processing.

    public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
        ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
        cq.putMessagePositionInfoWrapper(dispatchRequest);
    }
Copy the code

Find the corresponding ConsumeQueue by topic and queueId.

 public void putMessagePositionInfoWrapper(DispatchRequest request) {
        final int maxRetries = 30;
        boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
        for (int i = 0; i < maxRetries && canWrite; i++) {
            long tagsCode = request.getTagsCode();
            if (isExtWriteEnable()) {
                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                cqExtUnit.setFilterBitMap(request.getBitMap());
                cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
                cqExtUnit.setTagsCode(request.getTagsCode());

                long extAddr = this.consumeQueueExt.put(cqExtUnit);
                if (isExtAddr(extAddr)) {
                    tagsCode = extAddr;
                } else {
                    log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit, topic, queueId, request.getCommitLogOffset()); }}boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
                request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
            if (result) {
                if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE ||
                    this.defaultMessageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
                    this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
                }
                this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
                return;
            } else {
                // XXX: warn and notify me
                log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + "" + request.getCommitLogOffset()
                    + " failed, retry " + i + " times");

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    log.warn("", e); }}}// XXX: warn and notify me
        log.error("[BUG]consume queue can not write, {} {}".this.topic, this.queueId);
        this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
    }
Copy the code

The code above processes the data by processing the putMessagePositionInfo method:

private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
        final long cqOffset) {

        if (offset + size <= this.maxPhysicOffset) {
            log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
            return true;
        }

        this.byteBufferIndex.flip();
        this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
        this.byteBufferIndex.putLong(offset);
        this.byteBufferIndex.putInt(size);
        this.byteBufferIndex.putLong(tagsCode);

        final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;

        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
        if(mappedFile ! =null) {

            if(mappedFile.isFirstCreateInQueue() && cqOffset ! =0 && mappedFile.getWrotePosition() == 0) {
                this.minLogicOffset = expectLogicOffset;
                this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
                this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
                this.fillPreBlank(mappedFile, expectLogicOffset);
                log.info("fill pre blank space " + mappedFile.getFileName() + "" + expectLogicOffset + ""
                    + mappedFile.getWrotePosition());
            }

            if(cqOffset ! =0) {
                long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();

                if (expectLogicOffset < currentLogicOffset) {
                    log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                        expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
                    return true;
                }

                if(expectLogicOffset ! = currentLogicOffset) { LOG_ERROR.warn("[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                        expectLogicOffset,
                        currentLogicOffset,
                        this.topic,
                        this.queueId, expectLogicOffset - currentLogicOffset ); }}this.maxPhysicOffset = offset + size;
            return mappedFile.appendMessage(this.byteBufferIndex.array());
        }
        return false;
    }
Copy the code

Subsequent data is actually similar to CommitLog data, which is dropped via MappedFile.

3 summary

To summarize how ConsumeQueue is generated:

  1. Start a ReputMessageService service thread
  2. Then read CommitLog data according to reputFromOffset (none or have)
  3. Wrap each piece of data as a DispatchRequest
  4. DefaultMessageStore doDispatch method for processing data, internal mainly through CommitLogDispatcher implementation for processing
  5. Follow-up procedure After ConsumeQueue is generated
  6. Repeat steps 2-4

It sleeps for 1 millisecond after each round of processing. The message producer does not send the message consumer can consume the data in the message, but after the processing of the ConsumeQueue can be consumed.