It took me a long time to summarize such a little thing. I hope you can have a look patiently. If you find something wrong, please feel free to communicate with me in the comment section or private message

In addition, I have also added a full version of the code comment on my Github, interested partners can also click this link to see a wave of Github address

Think WHAT I say has so little truth, to you have so little help, can also give me a wave of praise concern 666 yo ~

No more nonsense to say, the following began my performance ~

RocketMQ global flowchart

This is a big picture that I’m sure you don’t want to see at all. (So why did I put it in the first place? Mainly in order to let everyone have a global impression, and then the subsequent review can also be based on this flow chart to specific review)

So, here are some of the questions that will be added to describe the RocketMQ workflow, and you are welcome to leave your questions in the comments section

Message consumption logic

Message consumption can be divided into three main modules

  • Rebalance
  • Pull the message
  • News consumption

Rebalance

// RebalanceImpl
public void doRebalance(final boolean isOrder) {
  Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
  if(subTable ! =null) {
    // Traverse the queue for each topic
    / / subTable in DefaultMQPushConsumerImpl subscribe and unsubscribe to modify
    for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
      final String topic = entry.getKey();
      try {
        // Re-load the queue
        this.rebalanceByTopic(topic, isOrder);
      } catch (Throwable e) {
        if(! topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("rebalanceByTopic Exception", e); }}}}this.truncateMessageQueueNotMyTopic();
}
Copy the code
private void rebalanceByTopic(final String topic, final boolean isOrder) {
  switch (messageModel) {
    case BROADCASTING: {
      Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
      if(mqSet ! =null) {
        boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
        if (changed) {
          this.messageQueueChanged(topic, mqSet, mqSet);
          log.info("messageQueueChanged {} {} {} {}", consumerGroup, topic, mqSet, mqSet); }}else {
        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
      }
      break;
    }
    case CLUSTERING: {
      // topicSubscribeInfoTable Topic Subscribe information cache table
      Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
      // Send a request to the broker to get all the current consumer client ids in this consumer group under topic
      List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
      if (null == mqSet) {
        if(! topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); }}if (null == cidAll) {
        log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
      }

      if(mqSet ! =null&& cidAll ! =null) {
        List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
        mqAll.addAll(mqSet);

        Sorting ensures that consumers in the same consumption group see the same view, ensuring that the same consumption queue is not allocated by multiple consumers
        Collections.sort(mqAll);
        Collections.sort(cidAll);

        // Allocation algorithm (try to use the first two)
        // There are five default hash types: 1) Average allocation 2) Average polling allocation 3) consistency hash
        // 4) Configure fixed message queues for each consumer according to the configuration. 5) Assign queues on different brokers to each consumer according to the broker deployment room name
        However, if the number of consumers is greater than the number of message queues, some consumers will not be able to consume messages
        AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

        // The queue to which the current consumer is assigned
        List<MessageQueue> allocateResult = null;
        try {
          allocateResult = strategy.allocate(
            this.consumerGroup,
            this.mQClientFactory.getClientId(),
            mqAll,
            cidAll);
        } catch (Throwable e) {
          log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                    e);
          return;
        }

        Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
        if(allocateResult ! =null) {
          allocateResultSet.addAll(allocateResult);
        }

        // Update the message consumption queue. If it is a new message consumption queue, a message pull request is created and immediately executed
        boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
        if (changed) {
          log.info(
            "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
            strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
            allocateResultSet.size(), allocateResultSet);
          this.messageQueueChanged(topic, mqSet, allocateResultSet); }}break;
    }
    default:
      break; }}private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
                                                   final boolean isOrder) {
  boolean changed = false;

  Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
  while (it.hasNext()) {
    Entry<MessageQueue, ProcessQueue> next = it.next();
    MessageQueue mq = next.getKey();
    ProcessQueue pq = next.getValue();

    if (mq.getTopic().equals(topic)) {
      // The current allocated queue does not contain the original queue (the current queue is allocated to another consumer)
      if(! mqSet.contains(mq)) {Discarding the processQueue / /
        pq.setDropped(true);
        // Remove the current message queue
        if (this.removeUnnecessaryMessageQueue(mq, pq)) {
          it.remove();
          changed = true;
          log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq); }}else if (pq.isPullExpired()) {
        switch (this.consumeType()) {
          case CONSUME_ACTIVELY:
            break;
          case CONSUME_PASSIVELY:
            pq.setDropped(true);
            if (this.removeUnnecessaryMessageQueue(mq, pq)) {
              it.remove();
              changed = true;
              log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
                        consumerGroup, mq);
            }
            break;
          default:
            break;
        }
      }
    }
  }

  List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
  for (MessageQueue mq : mqSet) {
    // The message consumption queue does not exist in the current queue
    if (!this.processQueueTable.containsKey(mq)) {
      // Request the broker to lock the MessageQueue and the corresponding ProcessQueue locally.
      if (isOrder && !this.lock(mq)) {
        log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
        // Lock failed, skip, wait for the next queue load again try to lock
        continue;
      }

      // Remove the message queue consumption progress from memory
      this.removeDirtyOffset(mq);
      ProcessQueue pq = new ProcessQueue();

      long nextOffset = -1L;
      try {
        nextOffset = this.computePullFromWhereWithException(mq);
      } catch (Exception e) {
        log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
        continue;
      }

      if (nextOffset >= 0) {
        ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
        if(pre ! =null) {
          log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
        } else {
          // For the first time, build the request to pull the message
          log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
          PullRequest pullRequest = new PullRequest();
          pullRequest.setConsumerGroup(consumerGroup);
          pullRequest.setNextOffset(nextOffset);
          pullRequest.setMessageQueue(mq);
          pullRequest.setProcessQueue(pq);
          pullRequestList.add(pullRequest);
          changed = true; }}else {
        log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); }}}// Pull the message immediately (for the new queue)
  this.dispatchPullRequest(pullRequestList);

  return changed;
}
Copy the code

From the flowchart and code, we can know that the message load in cluster mode mainly has the following steps:

  1. Gets a list of consumers subscribing to the current Topic from the Broker
  2. Load balancing is performed based on specific policies
  3. Processes the queue to which the current consumer is assigned
    1. Discard the corresponding message processing queue
    2. ProcessQueue is added and, if it is added for the first time, a message pull request is created

Pull the message

There is so much code for pulling messages that I won’t post it here.

Let me outline the process here, and then there are a few things that need to be noted

After we Rebalance the first queue and all pull messages, we’ll add a pull request to the pullRequestQueue. Then a thread will pull in to perform the pull

Here’s one of the highlights of the RocketMQ consumer design

It will pull messages, consume messages decoupled by two task queues, and then each module is only responsible for its own functionality. (Although they thought it was common, I still felt good when I watched it.)

It is also important to note that both the broker and consumer filter the message when pulling it, but the broker filters the message based on the hash of the tag, while the consumer filters the message based on the specific tag string match. This is also the reason why there are sometimes no messages to consume when messages are clearly pulled

Speaking of message filtering, here are a few ways to filter RocketMQ messages

  • Expression filtering
    • tag
    • SQL92
  • Class filter

News consumption

Here also say a few attention points first, later separate article.

(1) Sequential consumption and non-sequential consumption failure processing

(2) Update of consumption failure offset: Only after the current batch of messages are consumed successfully, the offset will be updated as the offset of the last batch of messages

(3) If a broadcast message fails, only failure logs are displayed

Supplement: Why do consumers in the same consumer group have the same subscription information

First of all, what does it mean that the subscription information of the same consumer group should be the same

That is, under the same GroupId, each consumer’s Topic+Tag should be consistent, otherwise messages cannot be consumed normally

Reference document: Aliyun: Consistent subscription relationship

When we look at this problem, we can think of it in two categories

  • The topic is not consistent
  • The tag is not consistent

(1) Topic disagreement

First of all, consumer A listens to TopicA and consumer B listens to TopicB, but both consumer A and Consumer B belong to the same groupTest

In the Rebalance phase, consumer A will query information about all consumers under groupTest when performing load balancing on TopicA. We get consumer A and consumer B. At this point, TopicA queues will be load-balanced between consumer A and consumer B (for example, consumer A is allocated to 1234 queues, and consumer B to 5678 queues). At this time, consumer B does not have the processing logic for TopicA, so the messages pushed to the queues of 5678 cannot be processed.

(II) The problem of inconsistent tag

As consumer A and Consumer B continue to load balance, the latest subscription information (message filtering rules) is continuously reported to the broker. The broker will overwrite and update the message, causing the tag information to change constantly. The change of the tag will affect the broker’s filtering when the consumer pulls the message, causing the broker to filter out some messages that would otherwise be pulled by the consumer

How does a delay queue work

As can be seen from the flow chart, RocketMQ processes delayed messages by Timer (related class ScheduleMessageService). The delay task to be processed is read in the task queue of Timer, and the message is forwarded from the delay queue to the specific service queue

Note that the Timer mentioned here is a scheduled task tool in the Java utility class package (java.util.timer). It consists of two main parts: TaskQueue queue queue and TimerThread Thread. Here I’ll simply compare it to a single-threaded worker thread pool

In addition, two methods of Timer are used in ScheduleMessageService. I will list them separately here

  • This.timer.schedule: After the task is successfully executed, the corresponding period is added and then the task is executed
  • This. The timer. ScheduleAtFixedRate: every time is executed once, has nothing to do with the task execution time

Words not much, affixed with the source * (source code although boring, but I hope you can patiently read) *

// ScheduleMessageService
public void start(a) {
  if (started.compareAndSet(false.true)) {
    super.load();
    this.timer = new Timer("ScheduleMessageTimerThread".true);
    // Create a scheduled task based on the delay queue
    for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
      Integer level = entry.getKey();
      Long timeDelay = entry.getValue();
      Long offset = this.offsetTable.get(level);
      if (null == offset) {
        offset = 0L;
      }

      if(timeDelay ! =null) {
        // Delay the execution of the task for the first time by one second, and execute the task according to the corresponding delay time
        // Relationship between delay level and message queue ID: Message queue ID = Delay level -1
        // The shedule adds the corresponding period to the successful task execution and then executes it
        this.timer.schedule(newDeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); }}// scheduleAtFixedRate Executes the task at specified intervals, regardless of the task execution time
    this.timer.scheduleAtFixedRate(new TimerTask() {

      @Override
      public void run(a) {
        try {
          if (started.get()) {
            // Persist the processing progress of the delay queue every ten seconds
            ScheduleMessageService.this.persist(); }}catch (Throwable e) {
          log.error("scheduleAtFixedRate flush exception", e); }}},10000.this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); }}Copy the code
// DeliverDelayedMessageTimerTask
@Override
public void run(a) {
  try {
    if (isStarted()) {
      this.executeOnTimeup(); }}catch (Exception e) {
    // XXX: warn and notify me
    log.error("ScheduleMessageService, executeOnTimeup exception", e);
    ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
      this.delayLevel, this.offset), DELAY_FOR_A_PERIOD); }}public void executeOnTimeup(a) {
  // Find the consumption queue according to the delay queue topic and delay queue ID
  ConsumeQueue cq =
    ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                                                                     delayLevel2QueueId(delayLevel));

  long failScheduleOffset = offset;

  if(cq ! =null) {
    SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
    if(bufferCQ ! =null) {
      try {
        long nextOffset = offset;
        int i = 0;
        // Traverses the ConsumeQueue. Each standard ConsumeQueue entry is 20 bytes
        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
          long offsetPy = bufferCQ.getByteBuffer().getLong();
          int sizePy = bufferCQ.getByteBuffer().getInt();
          long tagsCode = bufferCQ.getByteBuffer().getLong();

          if (cq.isExtAddr(tagsCode)) {
            if (cq.getExt(tagsCode, cqExtUnit)) {
              tagsCode = cqExtUnit.getTagsCode();
            } else {
              //can't find ext content.So re compute tags code.
              log.error("[BUG] can't find consume queue extend file content! addr={}, offsetPy={}, sizePy={}",
                        tagsCode, offsetPy, sizePy);
              longmsgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy); tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime); }}long now = System.currentTimeMillis();
          long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

          nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

          // > 0 Message consumption time is not reached
          long countdown = deliverTimestamp - now;

          if (countdown <= 0) {
            MessageExt msgExt =
              ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
              offsetPy, sizePy);

            if(msgExt ! =null) {
              try {
                MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
                  log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
                            msgInner.getTopic(), msgInner);
                  continue;
                }
                // Put it under the corresponding %RETRY%+ GID RETRY topic for consumption (forward messages)
                PutMessageResult putMessageResult =
                  ScheduleMessageService.this.writeMessageStore
                  .putMessage(msgInner);

                if(putMessageResult ! =null
                    && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                  if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) {
                    ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getMsgNum());
                    ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getWroteBytes());
                    ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getMsgNum());
                    ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getWroteBytes());
                    ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
                    ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
                                                                                                            putMessageResult.getAppendMessageResult().getWroteBytes());
                    ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
                  }
                  continue;
                } else {
                  // XXX: warn and notify me
                  log.error(
                    "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
                    msgExt.getTopic(), msgExt.getMsgId());
                  ScheduleMessageService.this.timer.schedule(
                    new DeliverDelayedMessageTimerTask(this.delayLevel,
                                                       nextOffset), DELAY_FOR_A_PERIOD);
                  ScheduleMessageService.this.updateOffset(this.delayLevel,
                                                           nextOffset);
                  return; }}catch (Exception e) {
                / * *XXX: warn and notify me
                                         */
                log.error(
                  "ScheduleMessageService, messageTimeup execute error, drop it. msgExt={}, nextOffset={}, offsetPy={}, sizePy={}", msgExt, nextOffset, offsetPy, sizePy, e); }}}else {
            // The next task execution time will be set to countdown. That is, the message delay forwarding time - the current time
            ScheduleMessageService.this.timer.schedule(
              new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
              countdown);
            ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
            return; }}// end of for

        // Updates the delay queue pull task progress
        nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
          this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
        return;
      } finally{ bufferCQ.release(); }}// end of if (bufferCQ ! = null)
    else {
      // The consumption queue does not exist, default is no task to consume, skip this consumption

      long cqMinOffset = cq.getMinOffsetInQueue();
      long cqMaxOffset = cq.getMaxOffsetInQueue();
      if (offset < cqMinOffset) {
        // Next pull task progress update
        failScheduleOffset = cqMinOffset;
        log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
                  offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
      }

      if (offset > cqMaxOffset) {
        failScheduleOffset = cqMaxOffset;
        log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}", offset, cqMinOffset, cqMaxOffset, cq.getQueueId()); }}}// end of if (cq ! = null)

  // Create a task based on the latency level
  ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                                                                                failScheduleOffset), DELAY_FOR_A_WHILE);
}
Copy the code