The original address: https://mp.weixin.qq.com/s/boT7R-KfmGqStOI3VMc5NA

Rebalance the overview

Consumer Group Rebalance is essentially a set of protocols that dictate how a consumer group agrees to allocate all partitions of a subscription topic.

Suppose there are 20 consumer instances under a group that subscribes to a topic with 100 partitions. Normally, Kafka allocates an average of five partitions for each consumer. This allocation process is called rebalance. After the consumer successfully performs the rebalance, each partition of the group subscription topic is allocated to only one consumer instance within the group.

Unlike previous versions of Consumer, which relied on Zookeeper to make rebalance, consumer uses a new group Coordination Protocol built into Kafka.

For each group, a Kafka broker is elected as the group coordinator.

A Coordinator is responsible for managing the group status. When a new member arrives, a coordinator makes all members in the group agree to a new partition assignment plan. A Coordinator makes rebalance operations on the group.

Rebalance trigger condition

Rebalance triggers the following three conditions.

  • When a group member changes, for example, a new consumer joins the group, an existing consumer leaves the group voluntarily, or an existing consumer crashes and triggers the rebalance.
  • Changing the number of group subscriptions, such as regular express-based subscriptions, triggers rebalance when a new topic matching the regular expression is created.
  • The number of partitions for the group subscription topic was changed, for example by using a command script to increase the number of partitions for the subscription topic.

In real life, the most common cause of rebalance is violating the first condition, especially when a consumer crashes.

A crash does not necessarily mean that the consumer process “hangs” or that the machine on which the consumer process is running is down.

When the consumer cannot finish processing the message within the specified time, the coordinator considers that the consumer has crashed, causing another round of rebalance.

Rebalance partitioning

The new consumer version of Kafka provides three allocation policies by default: range, Round-robin, and Sticky.

The so-called allocation policy determines which consumer each partition of a subscription topic will be assigned to.

The range strategy is mainly based on the idea of range.

It arranges all partitions of a single topic in order, then divides these partitions into fixed-size partitions and assigns them to each consumer in turn.

The round-robin strategy arranges all partitions for all topics in order, and then pollingly assigns them to each consumer.

The sticky strategy effectively avoids the defect that the above two strategies completely ignore the historical allocation scheme, and adopts the “sticky” strategy to allocate all consumer instances. It can avoid the data skewness in extreme cases and maintain the previous allocation scheme to the maximum extent between the two rebalance.

Generally speaking, if all consumer instances in the group have the same subscription, then round-robin will result in a more equitable allocation; otherwise, range will work better.

The user can according to consumer parameters partition. The assignment. The strategy for setting.

In addition, Kafka supports custom assignment policies. Users can create their own consumer assignor.

Suppose there are two consumers in A consumer group: A and B.

When the third member C joins, the first trigger condition discussed earlier is met.

Coordinators rebalance and repartition A, B, and C based on the Range allocation policy, as shown in the figure below:

After rebalance, A, B, and C each consume 2 partitions. This is fair. The load on each consumer is the same.

Rebalance Generation

Each consumer group can perform any number of rebalance.

To better isolate data from each rebalance, the new consumer release uses rebalance Generation to identify each rebalance.

The term generation is similar to the concept of generational in the JVM generational garbage collector (strictly speaking, the JVM GC uses generational).

This is translated as “year”, which means the year after rebalance. In consumer, it’s an integer, usually starting at 0.

Kafka introduced consumer Generation to defer offsets for some reason. After rebalance, the group has a new member and the offset is delayed with the old generation. So the proposal was rejected by Consumer Group.

This is why many Kafka users encounter ILLEGAL_GENERATION exceptions when using consumer.

In fact, after each group rebalance, generation is incremented by 1, indicating that the group has entered a new version.

As shown in figure:

The coordinator triggers the rebalance. The consumer group changes to Generation 2. Member 4 joins the group and triggers the rebalance again. Group Enters the Generation 3 era.

Rebalance agreement

Rebalance is essentially a set of protocols. The group and coordinator use this protocol to rebalance the group.

The latest version of Kafka provides the following five protocols for dealing with rebalance.

  • JoinGroup Request: The consumer requests to join the group.
  • SyncGroup request: The group leader synchronously updates the assignment scheme to all members of the group.
  • Heartbeat requests: A consumer periodically reports a Heartbeat to a Coordinator to indicate that it is still alive.
  • LeaveGroup Request: The consumer actively notifies the coordinator that the consumer is about to leave the group.
  • DescribeGroup (description) Query group information, including member information, protocol information, assignment scheme, and subscription information. This request type is primarily used by administrators. Coordinator does not use this request to perform rebalance.

In the rebalance process, coordinators process JoinGroup and SyncGroup requests sent by a consumer. When a consumer departs from the rebalance group, coordinators send LeaveGroup requests to coordinators.

After the rebalance is successful, all consumers in the group need to periodically send Hearbeat requests to coordinators.

The consumer also checks whether the group has started a new rebalance based on whether the Hearbeat response contains REBALANCE_IN_PROGRESS.

Rebalance processes

Before performing the rebalance, the Consumer group must first identify the broker where the coordinator resides and create a Socket connection to communicate with that broker.

The algorithm used to determine the coordinator is the same as the algorithm used to determine that the offset is submitted to the _consumer_offsets target partition.

The algorithm is as follows:

  • Calculation Math. Abs (groupID. HashCode) % offsets. The topic. Num. Partitions parameter value (the default is 50), the hypothesis is 10.
  • Find the broker where the leader copy of partition _consumer_offsets 10 resides, which is the coordinator of the group.

After the coordinator is connected, you can perform the rebalance operation.

Currently, there are two steps to rebalance: adding groups and updating allocation schemes simultaneously.

Joining a Group: In this step, all the consumers in the group (that is, all consumer instances with the same group. Id) send a JoinGroup request to a Coordinator.

After a JoinGroup request is collected, a Coordinator selects a consumer as the leader of the Group and sends all member information and their subscription information to the Leader.

It is important to note that the group leader and coordinator are not the same concept.

A leader is a consumer instance, and a coordinator is usually a broker ina Kafka cluster. In addition, the leader, rather than the coordinator, is responsible for formulating allocation plans for all members of the group.

Synchronously update the allocation scheme: In this step, the leader starts the allocation scheme, which determines which partitions of which topics each consumer is responsible for based on the allocation policy mentioned earlier.

Once the allocation is complete, the leader encapsulates the allocation scheme into a SyncGroup request and sends it to the Coordinator. Interestingly, all members of the group send a SyncGroup request, but only the SyncGroup request sent by the leader contains the allocation scheme.

After receiving the allocation scheme, coordinators separately extract the scheme belonging to each consumer and return it to each consumer as the response requested by SyncGroup.