This section describes the Rebalance process. Before we introduce the Consumer consumption message flow, let’s introduce the process of Rebalance, which involves starting a Consumer.

As mentioned earlier, Topic is a logical concept under which multiple queues can be divided to increase the parallelism of Consumer consumption. Within a Consumer Group, the relationship between queues and consumers is one-to-many: a Queue can be assigned to at most one Consumer, and a Cosumer can be assigned to multiple queues, as shown below

Rebalance is a protocol that dictates how all consumers in a Consumer Group agree to allocate queues. When a Consumer subscribes to a Topic that changes, or when a Consumer instance in a Consumer Group sends a change, the Rebalance is triggered to reallocation each instance’s Queue.

1.RebalanceService

As mentioned earlier in the client startup process, MQClientInstance starts a series of background tasks in the start method. This includes the Rebalance task, which calls the Start method of RebalanceService. The RebalanceService inherits from ServiceThread. The start method starts a background thread, ensuring that the doRebalance method of MQClientInstance is called once in a while (20 seconds by default). The diagram below:

MQClientInstance. DoRebalance will call MQConsumerInner. DoRebalance. MQConsumerInner is DefaultMQPullConsumerImpl and DefaultMQPushConsumerImp parent interface, as follows, MQClientInstance hands off the doRebalance method to the Consumer instance.

The Consumer instance then calls the doRebalance method of the internal RebalnaceImpl to perform the actual action.

RebalanceService is held by MQClientInstane. An MQClientInstance has only one Rebalance instance. MQClientInstance is managed by MQClientManager and is related to the local IP, process PID. The RebalanceImpl is related to a Consumer instance, where a Consumer instance corresponds to a RebalanceImpl object.

2.RebalanceImpl

This section describes the basic information about the class

1. The attribute

protected final ConcurrentMapConcurrentMap</*Queue*/MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64); / / DefaultMQXxxxConsumerImpl updateTopicSubscribeInfo protected while the final ConcurrentMap < String topic / * * /, Set<MessageQueue>> topicSubscribeInfoTable = new ConcurrentHashMap<String, Set<MessageQueue>>(); / / DefaultMQXxxxConsumerImpl subscript when adding protected final ConcurrentMap < String topic / * * /, SubscriptionData> subscriptionInner = new ConcurrentHashMap<String, SubscriptionData>(); protected String consumerGroup; // ConsumerGroup protected MessageModel MessageModel; / / message consumption patterns protected AllocateMessageQueueStrategy AllocateMessageQueueStrategy; / / the Queue allocation strategy, is AllocateMessageQueueAveragely by defaultCopy the code

2. Inheritance

Following the content of Part 1, the class’s doRebalce method is called with the following logic:

(1) Rotate all topics subscribed by this instance to obtain topic information by traversing the value of subscriptionInner. The content of this attribute will be added when the client instance calls subscript

Call rebalanceByTopic to rebalance

(3) In broadcast mode, all queues under this topic are obtained from topicSubscribeInfoTable for subsequent updates. In broadcast mode, each client receives all q’s under topic, and the Queue set allocated to the client is a full set.

(4) In cluster mode, all queues under topic will be obtained; Get a list of all client ids under this topic from the broker; Sorted call AllocateMessageAueueStrateg ConsumerGroup were obtained, the client should be assigned to a collection of the Queue. Cluster pattern, each client to q list by AllocateMessageQueueStrategy allocation.

(5) to get the client’s Queue after collection, called updateProcessQueueTableInRebalance updates.

(6) After execution, if there is any change, call messageQueueChanged and give the subclass specific processing.

(7) call truncateMessageQueueNotMyTopic removed the cache is not the instance handle of the Queue.

2.1. Find a list of client ids for this topic from the broker

Get the broker address for that topic from the MQClientInstance cache and call Netty to access the broker directly to get the result

2.2. Allocate the Queue to which the client instance belongs

Said earlier, broadcast mode distribution of each client instance set to the full amount of the Queue, cluster mode AllocateMessageQueueStrategy processing are reviewed. Default is evenly distributed, the implementation class for AllocateMessageQueueAveragely.

See first AllocateMessageQueueStrategy definition

List<MessageQueue> allocate(final String consumerGroup,// The current Consumer instance belongs to the consumerGroup final String CurrentCID,// Current client application ID Final List<MessageQueue> mqAll,// List of queues to be allocated Final List<String> cidAll List of all client application ids under the ConsumerGroup);Copy the code

This method selects the list of queues to which currentCID belongs. AllocateMessageQueueAveragely is according to the position of currentCID belongs to average distribution, process is as follows:

Above in the source code with a corresponding note. As mentioned above, mqAll and cidAll passed in are ordered. This process is to allocate the Queue to which the Queue belongs according to the number of the client. The possibilities are as follows:

About the process is: can be divisible, the average points; If the cid is not divisible, there is one more Queue for the cid in the number of mods, and one less Queue for the CID outside the number of mods.

As mentioned earlier, this allocation policy is implemented to sort the Queue list of topics with clients, resulting in the allocation of the first client to a larger Queue. Consider a situation where the ConsumerGroup subscribles to two topics, Topic_X and Topic_Y. Each topic has two queues and the ConsumerGroup has four client instances. Rebalance is based on topic. So instead of 4 queues being consumed equally, the result is as follows:

Therefore, during initialization, it is best to ensure that the number of clients under ConsumerGroup is less than the number of queues under =Topic.

2.3. updateProcessQueueTableInRebalance

This method is defined as follows:

/** * @param topic * @param mqSet Rebalance The topic of the set of all existing assigned to q * @ param isOrder whether to order * @ return * / private consumption Boolean updateProcessQueueTableInRebalance (final String topic, final Set<MessageQueue> mqSet,final boolean isOrder) {}Copy the code

As mentioned earlier, the RebalanceImpl has only one processQueueTable property, which maintains all queues currently being processed by the client and their corresponding consumption progress. UpdateProcessQueueTableInRebalance will update this property.

After the rebalance, check whether any Queue that is not part of the current client instance or has expired needs to be removed. Subclasses of this Queue will change the Queue to which the client instance belongs.

Rebalance the Queue. If so, remove the consumption offset from the cache. Then make a PullRequest object and store it in the list. The Queue marked Rebalane has changed

3 Distribute the PullRequest Queue list that is added after the rebalance. Subclasses will process the Queue list. Pull mode does not process, push mode, the PullMessageService, loop processing, specifically in the dispatchPullRequest method of RebalancePushImpl

@Override public void dispatchPullRequest(List<PullRequest> pullRequestList) { for (PullRequest pullRequest : pullRequestList) { this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest); log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest); }}Copy the code

This step, you will be back DefaultMQPushConsumerImpl executePullRequestImmediately method, to add PullRequest object, to trigger the execution, the Push model of concrete process will be in the next section describes < 1 >.

2.4. messageQueueChanged

Perform this action if the Queue you are processing changes after Rebalance. For the Push pattern, the version number of the client subscription topic is updated (with the current time stamp) and the broker is notified. To Pull mode, would change the callback DefaultMQPullConsumerImpl MessageQueueListener have Queue. Is used in the MQPullConsumerScheduleService, used to Pull mode timing consumption message < 2 >.

2.5. truncateMessageQueueNotMyTopic

Remove topics from the Queue that are not subscribed to by this instance.

The <1>,<2> points described above can be used to trigger the Consumer automatic/timed pull message, as described in the next section during the client consumption process.

3. Notify the Broker to Rebalance

Rebalance as mentioned above is timed by the client itself (20 seconds by default). There is also a situation where the Broker makes active notifications.

The Broker has a ConsumerManager that notifies each client when an instance of the client changes (up or down). When the client receives the notification, it calls rebalanceImmediately of MQClientInstance to perform rebalance directly. This method wakes up ServiceThread so that the RebalanceService does not wait for execution.

RocketMQ Rebalance is similar to Kafka Rebalance. Both Rebalance on the client.

  1. Kafka: One of the consumer instances ina consumer Group is selected as the Group Leader. The Group Leader allocates partitions, and the distribution results are synchronized to other consumers through a Cordinator(a special role broker). Kafka’s partition allocation has only one brain, the Group Leader.
  2. RocketMQ: Each consumer is responsible for assigning its own queue, and each consumer is a brain.

4. The potential dangers of Rebalance

  1. Consumption pause: Consider that only Consumer 1 is responsible for consuming all 5 queues; When the new Consumer 2 triggers the Rebalance, it needs to allocate two queues to consume. Then Consumer 1 needs to stop consuming these two queues and wait until the two queues are allocated to Consumer 2 before they can continue consuming.
  • Repeat consumption: When Consumer 2 consumes the two queues allocated to it, it must continue to consume from the offset that Consumer 1 has previously consumed. By default, however, offset is submitted asynchronously. For example, consumer 1 currently consumes 10 to offset but asynchronously submits 8 to the broker. So if consumer 2 starts consuming at offset of 8, there will be two duplicate messages. That is, Consumer 2 doesn’t wait for Consumer1 to make the offset before making the Rebalance, so the longer the offset, the more duplicate purchases you might make.
  • Rebalance can lead to repeated consumption if there are too many messages to repeat consumption. Or because the rebalance was paused for too long, some messages were backlogged. This can lead to consuming a lot of messages immediately after the rebalance is over.

Attached is a sketch of the notes taken during the reading of the source code at that time:

For more original content, please search our wechat official account doubaotaizi.