The best way to learn is to communicate with each other, and I recently discussed some of the issues of RocketMQ message pulling and rebalancing with the Internet, so I’ll just write down my summary here.

About message loop pull in push mode

A previous post on rebalancing: RocketMQ rebalance takes the consumer group ID and subscription information from any Broker node every 20 seconds and allocates it based on the subscription information. Then encapsulate the allocated information into a pullRequest object and pull it into the pullRequestQueue. The pull thread wakes up and executes the pull task. The flow chart is as follows:

But some of them are not detailed, such as every pull message to wait 20 seconds? An Internet user actually asked me the following question:

It is obvious that his project uses push mode to pull messages. To answer this question, let’s start with RockeMQ’s message pull:

RocketMQ’s push mode is implemented based on the Pull mode with a layer on top of the pull mode, so the RocketMQ push mode is not really a “push mode”. Therefore, in the push mode, after the consumer has pulled the message, Immediately start the next pull task, will not really wait for 20 seconds rebalance before pull, as for how to implement the push mode, that is from the source to find the answer.

I previously wrote “Why does RocketMQ guarantee consistency in subscriptions?” PullRequest tasks are pulled from the PullRequestQueue blocking queue. PullRequest tasks are pulled from the PullRequestQueue blocking queue.

RocketMQ provides the following methods altogether:

Org. Apache. Rocketmq. Client. Impl. Consumer. PullMessageService# executePullRequestImmediately:

public void executePullRequestImmediately(final PullRequest pullRequest) {
  try {
    this.pullRequestQueue.put(pullRequest);
  } catch (InterruptedException e) {
    log.error("executePullRequestImmediately pullRequestQueue.put", e); }}Copy the code

As you can see from the call chain, the onSuccess method in the PullCallback callback object in push mode also calls this method when the message is consumed, in addition to being called by rebalancing:

Org. Apache. Rocketmq. Client. Consumer. PullCallback# onSuccess:

case FOUND:

// If the pullRequest message is empty, continue to put the pullRequest into the blocking queue
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
  DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
  // Put the message into the consumer thread for execution
  DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
    pullResult.getMsgFoundList(), //
    processQueue, //
    pullRequest.getMessageQueue(), //
    dispathToConsume);
  // Put the pullRequest into the blocking queue
  DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);  
}

Copy the code

After a pullRequest is pulled from the broker, if the message is filtered, the pullRequest continues to loop through the blocking queue, otherwise the message is put into the consumer thread for execution, and the pullRequest is put into the blocking queue.

Case NO_NEW_MESSAGE:

Case NO_MATCHED_MSG:

pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
Copy the code

If there are no new messages to pull from the broker or no match, the pullRequest is placed in a blocking queue and the message pull loop continues.

As you can see from the message consumption logic above, the pullRequest is put back on the blocking queue as soon as the message is processed, so this explains why push mode can keep pulling messages:

After messages are consumed in push mode, this method is also called to re-place the PullRequest object in the PullRequestQueue blocking queue, continuously pulling messages from the broker for push effect.

What happens when the queue is rebalanced and allocated by other consumers?

What if, after rebalancing, a queue is allocated by a new consumer? You can’t continue pulling messages from that queue, right?

After rebalancing, RocketMQ will check to see if the pullRequest is still in the newly allocated list. If not, it will drop the pullRequest. Call isDrop() to check whether the pullRequest has been dropped:

Org. Apache. Rocketmq. Client. Impl. Consumer. DefaultMQPushConsumerImpl# pullMessage:

final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
  log.info("the pull request[{}] is dropped.", pullRequest.toString());
  return;
}
Copy the code

Before the message is pulled, check whether the queue is discarded. If so, the pull task is abandoned.

So when is the queue discarded?

Org. Apache. Rocketmq. Client. Impl. Consumer. RebalanceImpl# updateProcessQueueTableInRebalance:

Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
  Entry<MessageQueue, ProcessQueue> next = it.next();
  MessageQueue mq = next.getKey();
  ProcessQueue pq = next.getValue();

  if (mq.getTopic().equals(topic)) {
    // Determine if the current cache MessageQueue is included in the latest mqSet, if not, discard the queue
    if(! mqSet.contains(mq)) { pq.setDropped(true);
      if (this.removeUnnecessaryMessageQueue(mq, pq)) {
        it.remove();
        changed = true;
        log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq); }}else if (pq.isPullExpired()) {
      // If the queue pull expires, it is discarded
      switch (this.consumeType()) {
        case CONSUME_ACTIVELY:
          break;
        case CONSUME_PASSIVELY:
          pq.setDropped(true);
          if (this.removeUnnecessaryMessageQueue(mq, pq)) {
            it.remove();
            changed = true;
            log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
                      consumerGroup, mq);
          }
          break;
        default:
          break; }}}}Copy the code

UpdateProcessQueueTableInRebalance method execution when weight balance, used to update the processQueueTable, it is a list of the current consumer queue cache, The above method logic determines whether the current cache MessageQueue is included in the latest mqSet. If not, it indicates that after this rebalancing, the queue is allocated to other consumers, or the pull interval is too long and expired. The setDropped(true) method is called to set the queue to the discarded state.

You may be asking what processQueueTable has to do with processQueue in the pullRequest. Look below:

Org. Apache. Rocketmq. Client. Impl. Consumer. RebalanceImpl# updateProcessQueueTableInRebalance:

New ProcessQueue / /
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
  // Put ProcessQueue into processQueueTable
  ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
  if(pre ! =null) {
    log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
  } else {
    log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
    PullRequest pullRequest = new PullRequest();
    pullRequest.setConsumerGroup(consumerGroup);
    pullRequest.setNextOffset(nextOffset);
    pullRequest.setMessageQueue(mq);
    // Put ProcessQueue into the pullRequest pull task object
    pullRequest.setProcessQueue(pq);
    pullRequestList.add(pullRequest);
    changed = true; }}Copy the code

As you can see, the ProcessQueue object was created during the rebalance, put it into the processQueueTable cache queue table, and put it into the pullRequest pull task object, ProcessQueue in processQueueTable is the same object as ProcessQueue in pullRequest.

Does rebalancing result in repeated message consumption?

Earlier, a member of the group asked this question:

I told him that RocketMQ normally does not have double consumption, but it turns out that RocketMQ does have double consumption of messages under certain circumstances.

When consuming a RocketMQ message, the message is put into the consuming thread and executed as follows:

Org. Apache. Rocketmq. Client. Consumer. PullCallback# onSuccess:

DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
  pullResult.getMsgFoundList(), //
  processQueue, //
  pullRequest.getMessageQueue(), //
  dispathToConsume);
Copy the code

The ConsumeMessageService class implements the logic of message consumption and has two implementation classes:

// Concurrent message consumption logic implementation class
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
// Sequential message consumption logic implementation class
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;
Copy the code

Let’s start with the logic related to concurrent message consumption:

ConsumeMessageConcurrentlyService:

Org. Apache. Rocketmq. Client. Impl. Consumer. ConsumeMessageConcurrentlyService. ConsumeRequest# run:

if (this.processQueue.isDropped()) {
  log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
  return;
}

// Message consumption logic
// ...

// If the queue is set to discard, the message consumption progress is not submitted
if(! processQueue.isDropped()) { ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
    log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
Copy the code

ConsumeRequest is an inherited Runnable class that implements the core logic of message consumption. The submitConsumeRequest method puts the ConsumeRequest into the consuming thread pool to consume the message. As you can see from its Run method, If a node is added in the message consumption logic, the queue is allocated to other nodes for consumption after rebalancing, and the queue is discarded, the message consumption progress is not submitted, because it has been consumed before, and the message consumption will be repeated.

Let’s look at the sequential consumption-related processing logic:

ConsumeMessageOrderlyService:

Org. Apache. Rocketmq. Client. Impl. Consumer. ConsumeMessageOrderlyService. ConsumeRequest# run:

public void run(a) {
  // Determine whether the queue is discarded
  if (this.processQueue.isDropped()) {
    log.warn("run, the message queue not be able to consume, because it's dropped. {}".this.messageQueue);
    return;
  }

  final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
  synchronized (objLock) {
    // If the queue is not in broadcast mode and the lock is locked, the lock does not expire
    if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
        || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
      final long beginTime = System.currentTimeMillis();
      for (boolean continueConsume = true; continueConsume; ) {
        // Check again if the queue is discarded
        if (this.processQueue.isDropped()) {
          log.warn("the message queue not be able to consume, because it's dropped. {}".this.messageQueue);
          break;
        }
        
        // Message consumption processing logic
        // ...
        
          continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
        } else {
          continueConsume = false; }}}else {
      if (this.processQueue.isDropped()) {
        log.warn("the message queue not be able to consume, because it's dropped. {}".this.messageQueue);
        return;
      }
      ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100); }}}Copy the code

The RocketMQ sequential message consumption locks the queue and can only be consumed when the queue acquires the lock. Therefore, even if a node joins the message during consumption and the queue is rebalanced and then allocated to another node for consumption, the queue is discarded without causing repeated consumption.

The public account “Backend Advanced” focuses on back-end technology sharing: Java, Golang, WEB framework, distributed middleware, service governance and so on. Concern public number reply keyword “back end” free back end development package!