Wechat official account “Backend Advanced” focuses on back-end technology sharing: Java, Golang, WEB framework, distributed middleware, service governance and so on.

How do consumers redistribute consumption when new members are added to the cluster, or partitions are added to certain topics? Rebalance. This is what Kafka Rebalance is all about. I’m trying to make it easy to understand.

The role of rebalancing

Rebalancing is closely related to the consumer group, which ensures the fair distribution of member distribution zones in the consumer group, and is also the realization of the consumer group model, which is as follows:

Several concepts of the consumer group model can be found in the figure:

  1. In the same consumer group, a partition can only be subscribed by one consumer, but a consumer can subscribe to multiple partitions, that is, each message will only be consumed by a certain consumer in the same consumer group, to ensure that it will not be repeated consumption;
  2. A partition can be subscribed by different consumer groups. In this special case, only one consumer can join each consumer group, so that the partition will broadcast to all consumers, realizing broadcast consumption mode.

In order to implement the consumer group model, it is necessary to implement dynamic adjustments to maintain the model when the external environment changes, such as adding partitions to the topic, adding new members to the consumer group, etc. The Kafka rebalance mechanism takes care of this.

Rebalancing differences between Kafka and RocketMQ

Some implementations of Kafka rebalance are a little different than RocketMQ, but the ultimate goal is the same, to ensure that partitions (RocketMQ is a queue) are fairly allocated and can only be subscribed to by one consumer (the same consumer group).

Kafka rebalance:

As you can see from the figure, Kafka rebalancing is caused by external triggers. Kafka rebalancing is triggered by the following conditions:

  1. Members of the consumption group change, new consumers join or leave, or consumers crash;
  2. The number of topics subscribed to by the consumer group has changed;
  3. The number of partitions subscribed to by the consumer group changed. Procedure

Each consumer maintains a heartbeat with a Coordinator. When this happens, the heartbeat response contains the REBALANCE_IN_PROGRESS command. The consumer stops consuming and participates in the rebalancing event.

RocketMQ rebalance:

When the RocketMQ consumer starts, it starts two threads, one for pulling messages and the other for timed rebalancing. As you can see from the pullRequestQueue, the pullRequestQueue is a blocking queue. Means that when the element in the pullRequestQueue is empty, it will block until there is a new pull task, so what if you add a new task to the blocking queue? This is where RocketMQ’s rebalancing comes in. It takes the consumer group’s consumption ID and subscription information from any Broker node every 20 seconds and allocates it based on this subscription information. The pullRequest is then wrapped into a pullRequest object and pulled into the pullRequestQueue. The pull thread wakes up and performs the pull task.

Parameters involved in rebalancing

During consumer startup, certain parameters will affect the rebalancing mechanism. Therefore, you need to tune these parameters according to the business attributes. Otherwise, frequent rebalancing may occur due to improper Settings, which will seriously affect the consumption speed.

  • session.timeout.ms

This parameter is the time it takes for a Coordinator to detect a consumer failure, that is, whether a client and a Coordinator maintain heartbeat during this period. If the parameter is set to a low value, information about a consumer crash can be found earlier, enabling rebalancing more quickly and avoiding delayed consumption. However, it can also lead to frequent rebalancing. It depends on the actual business.

  • max.poll.interval.ms

For some services, it may take a long time to process messages, for example, one minute. In this case, set this parameter to a value greater than one minute. Otherwise, the message group will be removed by a Coordinator and rebalanced.

  • heartbeat.interval.ms

This parameter is closely related to session.timeout.ms. As previously mentioned, as long as the heartbeat between session.timeout.ms and a Coordinator is maintained, the Coordinator will not be removed. Therefore, the value of this parameter must be less than session.timeout.ms to keep the session.timeout.ms heartbeat interval.

The following is a graphic representation of the meanings of these three parameters:

Rebalancing process

In the new version, the coordination management of the consumer group depends on a node at the Broker end, which is the Coordinator of the consumer group. Each consumer group has only one Coordinator, which is responsible for all transaction coordination within the consumer group, including partition allocation, rebalancing trigger. The whole consumer group can be controlled by coordinators. In each process, the consumer group has a state. Kafka defines five states for the consumer group as follows:

  1. Empty: There is no active consumer in the consumer group;
  2. PreparingRebalance: A consumer group is ready to rebalance, where the consumer group may have accepted some consumer requests to join the group;
  3. AwaitingSync: All consumers have joined the group and are being rebalanced. Each consumer waits for the Broker to assign a partition plan.
  4. Stable: all partition schemes have been sent to consumers and consumers are consuming normally;
  5. Dead: The consumer group is completely abandoned by a Coordinator.

As you can see, the rebalancing takes place in the PreparingRebalance and AwaitingSync state machines, and the rebalancing consists of the following two steps:

  1. JoinGroup (JoinGroup) : when the consumer heartbeat packet responds to REBALANCE_IN_PROGRESS, it indicates that the consumer group is being rebalanced. At this time, the consumer will stop consuming and send a request to join the consumer group.
  2. Update allocation scheme synchronously: When a Coordinator receives a request from all the group members to join the group, it selects a consumer Leader and asks the Consumer Leader to allocate the group. After the allocation is completed, the Coordinator puts the allocation plan into the SyncGroup request and sends it to the Coordinator. Coordinators are sent to each consumer according to the allocation scheme.

Example of a rebalancing scenario

Depending on the conditions triggered by rebalancing, there are probably several types of rebalancing workflow:

New members join the consumer group:

Consumption group member crashes

The consumer group members left voluntarily

When a consumer group member commits a shift

Concern public number reply keyword “back end” free back end development package!