Note: This series of source code analysis is based on RocketMq 4.8.0, gitee Repository link: gitee.com/funcy/rocke… .

RocketMq message processing flows as follows:

  1. Message reception: Message reception refers to receivingproducerThe message processing class isSendMessageProcessor, writes the message tocommigLogAfter the file, the receiving process is completed;
  2. Message distribution:brokerThe class that handles message distribution isReputMessageService, it will start a thread that will continuouslycommitLongTo the correspondingconsumerQueue, this step writes two files:consumerQueuewithindexFile, the message distribution process is finished.
  3. Message delivery: Message delivery refers to sending a message toconsumerThe process,consumerWill initiate a request for the message,brokerUpon receipt of the request, callPullMessageProcessorClass processing fromconsumerQueueThe file gets the message and returns toconsumerAfter that, the delivery process is finished.

This is the rocketMq message processing process, next we will analyze the message delivery implementation from the source code.

1. The processingPULL_MESSAGErequest

Unlike producer, when a consumer pulls a message from the broker, it sends a request with a PULL_MESSAGE code and a processor with a PullMessageProcessor. We go directly to its processRequest method:

@Override
public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {
    // Call the method
    return this.processRequest(ctx.channel(), request, true);
}
Copy the code

This method simply calls an overloaded method. The extra argument true allows the broker to suspend the request.

/** * continue processing */
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, 
        boolean brokerAllowSuspend)throws RemotingCommandException {

    RemotingCommand response = RemotingCommand
        .createResponseCommand(PullMessageResponseHeader.class);
    final PullMessageResponseHeader responseHeader 
        = (PullMessageResponseHeader) response.readCustomHeader();
    final PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) 
        request.decodeCommandCustomHeader(PullMessageRequestHeader.class);

    response.setOpaque(request.getOpaque());

    // Omit the permission verification process
    // 1. RocketMq can set verification information to block connections from unauthorized clients
    ANY(PUB or SUB permission), PUB(send permission), SUB(subscribe permission), etc.
    // You can fine-grained control what the client does to the topic.// Get the subscription group
    SubscriptionGroupConfig subscriptionGroupConfig =
        this.brokerController.getSubscriptionGroupManager() .findSubscriptionGroupConfig(requestHeader.getConsumerGroup()); .// Get the subscription topic
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager() .selectTopicConfig(requestHeader.getTopic()); ./ / filter processing
    // When a consumer subscribs to a message, it can filter the subscribed message using two methods: tag and SQL92
    // Here we focus on the process of pulling the message, the specific filtering details will be analyzed later.// Get the message
    Obtain the ConsumerQueue file according to topic and queueId
    // 2. Retrieve the message content from the CommitLog according to the information in the ConsumerQueue file
    final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(
        requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), 
        requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
    if(getMessageResult ! =null) {
        // Omit a bunch of validation procedures.switch (response.getCode()) {
            // indicates that the message can be processed, in which case the message content is written to response
            case ResponseCode.SUCCESS:
                ...
                // Process the message content by reading the message from getMessageResult and putting it in Response
                if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
                    final long beginTimeMills = this.brokerController.getMessageStore().now();
                    // Convert the message content to a byte array
                    final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); . response.setBody(r); }else {
                    try {
                        // Message conversion
                        FileRegion fileRegion = new ManyMessageTransfer(response.encodeHeader(
                            getMessageResult.getBufferTotalSize()), getMessageResult);
                        channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
                            ...
                        });
                    } catch (Throwable e) {
                        ...
                    }

                    response = null;
                }
                break;

            // No message meeting the condition was found
            case ResponseCode.PULL_NOT_FOUND:
                // If suspend is supported, the current request is suspended
                if (brokerAllowSuspend && hasSuspendFlag) {
                    ...
                    PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
                        this.brokerController.getMessageStore().now(), offset, subscriptionData, 
                        messageFilter);
                    // No related message found, suspend operation
                    this.brokerController.getPullRequestHoldService()
                        .suspendPullRequest(topic, queueId, pullRequest);
                    response = null;
                    break;
                }

            // omit other types of processing.break;
            default:
                assert false; }}else {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("store getMessage return null"); }...return response;
}
Copy the code

In the source code, this method is also very long. Here I have erased all the details, leaving only some important processes. The entire process is as follows:

  1. Permission verification:rocketMqYou can set the verification information to block the connection of illegal clients. At the same time, you can set the publishing and subscription permissions of clients and control the access permissions of details.
  2. Get subscription groups, subscription topics, etc., which are mainly retrieved from the contents of the request messagebrokerThe corresponding record in
  3. Create a filter component:consumerWhen subscribing to messages, you can filter the subscribed messages in two ways:tagwithsql92
  4. Get the message: First according totopicqueueIdTo obtainConsumerQueueDocument, according toConsumerQueueFile information fromCommitLogGets the message content in, and message filtering occurs in this step
  5. Transform the message: If the message is obtained, the specific message content is copied toreponseIn the
  6. Suspend request: If no message is received and the current request supports suspend, the current request is suspended

The above code is relatively clear, the relevant process code has been annotated.

The above process is the entire message retrieval process. In this article, we only focus on the steps related to the message retrieval, focusing on the following two operations:

  • To get the message
  • Pending request

2. Get the message

Get the message using DefaultMessageStore#getMessage:

public GetMessageResult getMessage(final String group, final String topic, final int queueId, 
        final long offset, final int maxMsgNums, final MessageFilter messageFilter) {
    // Omit some judgments.// Topic and queueId have a ConsumeQueue, which records the commitLog location of messages
    ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
    if(consumeQueue ! =null) {
        minOffset = consumeQueue.getMinOffsetInQueue();
        maxOffset = consumeQueue.getMaxOffsetInQueue();

        if(...). {// Check whether offset meets the requirements. }else {
            Get the message from the consumerQueue file
            SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
            if(bufferConsumeQueue ! =null) {...for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; 
                    i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                        
                    // Omit a bunch of message filtering operations.// Get messages from commitLong
                    SelectMappedBufferResult selectResult 
                            = this.commitLog.getMessage(offsetPy, sizePy);
                    if (null == selectResult) {
                        if (getResult.getBufferTotalSize() == 0) {
                            status = GetMessageStatus.MESSAGE_WAS_REMOVING;
                        }

                        nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
                        continue;
                    }

                    // Omit a bunch of message filtering operations. }}}else {
        status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
        nextBeginOffset = nextOffsetCorrection(offset, 0);
    }

    if (GetMessageStatus.FOUND == status) {
        this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
    } else {
        this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
    }
    long elapsedTime = this.getSystemClock().now() - beginTime;
    this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime);

    getResult.setStatus(status);
    // handle offset again
    getResult.setNextBeginOffset(nextBeginOffset);
    getResult.setMaxOffset(maxOffset);
    getResult.setMinOffset(minOffset);
    return getResult;
}
Copy the code

This method is not too long and only the key flow is preserved here. The key flow to get the message is as follows:

  1. According to thetopicwithqueueIdfindConsumerQueue
  2. fromConsumerQueueObtain message information from the corresponding file, such astagthehashCodeAnd the message incommitLogLocation information in
  3. Based on location information, fromcommitLogTo get the full message

After the above steps, the message can be obtained, but before and after the message is obtained, the message will be filtered according to the tag or SQL syntax. About the details of message filtering, we will leave the chapter on message filtering for further analysis.

3. Suspend request:PullRequestHoldService#suspendPullRequest

PullRequestHoldService#suspendPullRequest:

public class PullRequestHoldService extends ServiceThread {

    private ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
        new ConcurrentHashMap<String, ManyPullRequest>(1024);

    public void suspendPullRequest(final String topic, final int queueId, 
            final PullRequest pullRequest) {
        String key = this.buildKey(topic, queueId);
        ManyPullRequest mpr = this.pullRequestTable.get(key);
        if (null == mpr) {
            mpr = new ManyPullRequest();
            ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
            if(prev ! =null) { mpr = prev; } } mpr.addPullRequest(pullRequest); }... }Copy the code

In the PendpullRequest method, all you are doing is putting the current request into the pullRequestTable. The pullRequestTable is a ConcurrentMap, the key is topic@queueId, and the value is a pending request.

Once the request is suspended, when is it processed? That’s what the PullRequestHoldService thread does.

3.1 Threads handling pending requests:PullRequestHoldService

After looking at the PullRequestHoldService#suspendPullRequest method, let’s look at the PullRequestHoldService.

PullRequestHoldService, which is a subclass of ServiceThread (last seen subclass of ServiceThread was ReputMessageService), also starts a new thread to handle suspend operations.

Let’s first look at where it starts the PullRequestHoldService thread. BrokerController’s start method start() has this line:

BrokerController#start

public void start(a) throws Exception {...if (this.pullRequestHoldService ! =null) {
        this.pullRequestHoldService.start(); }... }Copy the code

This is the thread operation to start the pullRequestHoldService.

To explore what this thread is doing, enter the PullRequestHoldService#run method:

@Override
public void run(a) {
    log.info("{} service started".this.getServiceName());
    while (!this.isStopped()) {
        try {
            / / wait
            if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                this.waitForRunning(5 * 1000);
            } else {
                this.waitForRunning(
                    this.brokerController.getBrokerConfig().getShortPollingTimeMills());
            }

            long beginLockTimestamp = this.systemClock.now();
            // Check the operation
            this.checkHoldRequest();
            long costTime = this.systemClock.now() - beginLockTimestamp;
            if (costTime > 5 * 1000) {
                log.info("[NOTIFYME] check hold request cost {} ms.", costTime); }}catch (Throwable e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    log.info("{} service end".this.getServiceName());
}
Copy the code

The thread will wait and then call the PullRequestHoldService#checkHoldRequest method.

private void checkHoldRequest(a) {
    for (String key : this.pullRequestTable.keySet()) {
        String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
        if (2 == kArray.length) {
            String topic = kArray[0];
            int queueId = Integer.parseInt(kArray[1]);
            final long offset = this.brokerController.getMessageStore()
                .getMaxOffsetInQueue(topic, queueId);
            try {
                // Call notifyMessageArriving
                this.notifyMessageArriving(topic, queueId, offset);
            } catch(Throwable e) { log.error(...) ; }}}}Copy the code

This method calls PullRequestHoldService#notifyMessageArriving(…) , we continue into:

public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) {
    // Continue the call
    notifyMessageArriving(topic, queueId, maxOffset, null.0.null.null);
}

/** * this method is finally called */
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, 
    final Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {

    String key = this.buildKey(topic, queueId);
    ManyPullRequest mpr = this.pullRequestTable.get(key);
    if(mpr ! =null) {
        List<PullRequest> requestList = mpr.cloneListAndClear();
        if(requestList ! =null) {
            List<PullRequest> replayList = new ArrayList<PullRequest>();

            for (PullRequest request : requestList) {
                // Determine whether a new message arrives based on the offset of comsumerQueue and request
                long newestOffset = maxOffset;
                if (newestOffset <= request.getPullFromThisOffset()) {
                    newestOffset = this.brokerController.getMessageStore()
                        .getMaxOffsetInQueue(topic, queueId);
                }

                if (newestOffset > request.getPullFromThisOffset()) {
                    boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
                        new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
                    if(match && properties ! =null) {
                        match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
                    }

                    if (match) {
                        try {
                            // Wake up the operation
                            this.brokerController.getPullMessageProcessor()
                                .executeRequestWhenWakeup(request.getClientChannel(),
                                request.getRequestCommand());
                        } catch (Throwable e) {
                            log.error("execute request when wakeup failed.", e);
                        }
                        continue; }}// The timeout is up
                if (System.currentTimeMillis() >= 
                        (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                    try {
                        // Wake up the operation
                        this.brokerController.getPullMessageProcessor()
                            .executeRequestWhenWakeup(request.getClientChannel(),
                            request.getRequestCommand());
                    } catch (Throwable e) {
                        log.error("execute request when wakeup failed.", e);
                    }
                    continue;
                }

                replayList.add(request);
            }

            if(! replayList.isEmpty()) { mpr.addPullRequest(replayList); }}}}Copy the code

This method is used to check if a new message has arrived. The method is a bit long, but can be summed up in one sentence: Wakeup the pullRquest (call the PullMessageProcessor#executeRequestWhenWakeup method) if a new message arrives or the pullRquest hold time is up.

  • Gets when determining whether a new message has arrivedcomsumerQueueMaximum offset in file, and currentpullRquestIf the former is large, it means that a new message has arrived and needs to be woken uppullRquest
  • As I said before, whenconsumerIf the request does not receive a message,brokerwillholdThis request is made for a period of time (30s), when this time is up, it will also wake uppullRquestAnd then never againholdLive it

3.2 Wake up Request:PullMessageProcessor#executeRequestWhenWakeup

PullMessageProcessor#executeRequestWhenWakeup method

public void executeRequestWhenWakeup(final Channel channel,
    final RemotingCommand request) throws RemotingCommandException {
    // focus on the Runnable#run() method
    Runnable run = new Runnable() {
        @Override
        public void run(a) {
            try {
                // call PullMessageProcessor#processRequest(...) again methods
                final RemotingCommand response = PullMessageProcessor.this
                    .processRequest(channel, request, false); . }catch (RemotingCommandException e1) {
                log.error("excuteRequestWhenWakeup run", e1); }}};// Submit the task
    this.brokerController.getPullMessageExecutor()
        .submit(new RequestTask(run, channel, request));
}
Copy the code

This method prepares a task and submits it to the thread pool for execution with a simple PullMessageProcessor#processRequest(…) call. Method, which is used to handle the consumer pull message mentioned at the beginning of this section.

3.3 Wake up during Message Distributionconsumerrequest

On the analysis of the information distribution process, DefaultMessageStore. ReputMessageService# doReput methods have so a period of:

private void doReput(a) {...// Distribute the message
    DefaultMessageStore.this.doDispatch(dispatchRequest);
    // Long polling: If a message arrives on the master node and long polling is enabled
    if(BrokerRole.SLAVE ! = DefaultMessageStore.this
            .getMessageStoreConfig().getBrokerRole()
            &&DefaultMessageStore.this.brokerConfig.isLongPollingEnable()){
        / / call NotifyMessageArrivingListener method of arriving
        DefaultMessageStore.this.messageArrivingListener.arriving(
            dispatchRequest.getTopic(),
            dispatchRequest.getQueueId(), 
            dispatchRequest.getConsumeQueueOffset() + 1, dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); }... }Copy the code

This section is used to wake up hold consumer request actively, we enter the NotifyMessageArrivingListener# arriving method:

 @Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
    long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
    this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode,
        msgStoreTime, filterBitMap, properties);
}
Copy the code

Finally it also calls PullRequestHoldService#notifyMessageArriving(…) Methods.

4. To summarize

This article focuses on the broker’s process for handling PULL_MESSAGE requests, and concludes as follows:

  1. brokerTo deal withPULL_MESSAGEtheprocessorforPullMessageProcessor.PullMessageProcessortheprocessRequest(...)This is the entire message retrieval process
  2. brokerWhen retrieving a message, the request is first processedtopicwithqueueIdfindconsumerQueueAnd then according to the requestoffsetParameters fromconsumerQueueFile to find messages incommitLogAnd finally according to the location information fromcommitLogTo get the message content
  3. ifbrokerNo current inconsumerQueueThe message,brokerThe current thread is suspended until it times out (30 seconds by default) or wakes up when a new message is received

Limited to the author’s personal level, there are inevitable mistakes in the article, welcome to correct! Original is not easy, commercial reprint please contact the author to obtain authorization, non-commercial reprint please indicate the source.

This article was first published in the wechat public number Java technology exploration, if you like this article, welcome to pay attention to the public number, let us explore together in the world of technology!