Kafka is our most popular message queue, and we’re thrilled with its tens, if not hundreds, of thousands of processing speeds. But as we use more and more scenarios, we run into more and more problems. One of the most common is rebalance.

But to understand this rebalance, you need to understand the Consumer group.

Consumer groups

A consumer group is a group of consumers that collectively consume all messages for a topic. Kafka defines five states for consumer groups: Empty, Dead, PreparingRebalance, CompletingRebalance, and Stable. The transformation relationship among the five states is as follows:

What is rebalance?

Kafka can be divided into three main blocks: producers, Kafka brokers, and consumers.



Rebalance is essentially a protocol that dictates how all consumers in a Consumer Group agree to allocate each partition of a subscription Topic. Rebalance is simply reassigning partitions to rebalance them.

When does rebalance happen?

  • The number of partitions for the subscribed Topic changes.
  • The number of subscribed topics changes.
  • The number of members in the consumer group changed. Procedure For example, a new consumer instance joins the consumer group or leaves the group (voluntarily leaves or is considered to leave).

When Rebalance occurs, all consumer instances in a consumer group are coordinated and participate together. Kafka ensures the most equitable distribution possible. But the Rebalance process can have a serious impact on consumers. All consumer instances under the consumer group will stop working during the Rebalance and wait for the Rebalance to complete. Here is a brief description of the rebalance situation.

The number of partitions for the subscribed Topic changes

Topic used to have 20 partitions, now it has 30, so the data of 10 more partitions is not consumed. At this point, it is necessary to rebalance and allocate the new 10 zones to consumers in the consumer group for consumption. So in this case, rebalance occurs.

The number of subscribed topics changes

If A consumer group has previously subscribed only to A Topic, the consumers in that group are notified of consuming A Topic. If you now subscribe to a new B topic, Kafka needs to allocate the PARTITION of B topic to the consumers in the group for consumption. This process of allocation is also a process of rebalance.

The number of members in the consumer group changed. Procedure

Consumers in a consumption group collectively consume messages under a topic. When the number of members in the consumer group changes, for example, a consumer leaves or a new consumer joins, the number of members in the consumer group will change, resulting in rebalancing. The following three situations are all situations in which members of a group change:

  • New member
  • The group members left voluntarily
  • Group member crash

Rebalance

Rebalance as we described earlier, there are three situations that rebalance:

  • New member
  • The group members left voluntarily
  • Group member crash

For “new members join”, “group members take the initiative to leave” are triggered by us, can better control. However, “group members crash (considered to leave)” is unexpected, and it is not easy to troubleshoot when encountering problems. But there are some general guidelines for troubleshooting group member crashes. Here’s how to make this rebalance. To learn how to deal with this rebalance, you need to understand the four parameters kafaka consumers configure:

  • Session.timeout. ms Specifies the timeout period
  • Heartbeat.interval. ms Heartbeat interval
  • Max.poll.interval. ms Specifies the processing time of each consumption
  • Max.poll. records Number of messages consumed each time

Session.timeout. ms Specifies the timeout period for the consumer to send heartbeat messages to the broker. For example, session.timeout.ms = 180000 indicates that the broker does not receive a consumer’s heartbeat for up to 180 seconds. The consumer is dead and rebalance. Heartbeat. Interval. Ms Indicates the interval at which the consumer sends heartbeat messages to the broker. Heartbeat. Interval. Ms = 60000 indicates that the consumer sends a heartbeat to the broker every 60 seconds. In general, the value of session.timeout.ms is more than 3 times the value of heartbeat.interval.ms. Max.poll.interval. ms Indicates the interval between two poll messages for a consumer. Simply put, this is how long a consumer spends on each consumption message. If the logic of message processing is heavy, then the market is prolonged accordingly. Otherwise, if it is time for the consumer to finish consuming, the broker will default to the view that the consumer is dead and rebalance. Max.poll. records indicates how many messages are fetched per consumption. The larger the number of messages, the longer the processing time. Make sure that you can consume all messages within the time set by max.poll.interval.ms. Otherwise, rebalance will occur.

In a nutshell, the points that can cause a crash are:

  • The consumer’s heartbeat timed out, causing rebalance.
  • Consumers take too long to process, leading to rebalance.

Consumer heartbeat timeout

We know that the consumer communicates with the coordinator through the heartbeat. If the coordinator can’t receive the heartbeat, the coordinator will assume that the consumer is dead and make a rebalance. The consumer Settings for Kafka are as follows:

  • Session.timeout. ms Specifies the timeout period
  • Ms The parameters session.timeout.ms and heartbeat.interval.ms need to be adjusted so that the consumer and coordinator can maintain the heartbeat. In general, the timeout should be three times the time between heartbeats. If session.timeout.ms is set to 180 seconds, heartbeat.interval.ms is set to 60 seconds. Why is the timeout set to be three times the heartbeat interval? This way, multiple heartbeats can occur within a timeout period, avoiding network problems that cause occasional failures.

The consumer processing time is too long

If the consumer takes too long to process, this can also cause the coordinator to assume that the consumer is dead and initiate a rebalance. Kafka’s consumer parameters are as follows:

  • Max.poll.interval. ms Specifies the processing time of each consumption
  • Number of messages consumed by max-poll.records In this case, it is generally a matter of increasing the processing time of the consumer (i.e., increasing the value of max-poll.interval.ms) and decreasing the number of messages processed per time (i.e., decreasing the value of max-poll.records). In addition, the timeout parameter (session.timeout.ms) is also associated with the time of each processing by the consumer (max.poll.interval.ms). Max.poll.interval. ms The time cannot exceed session.timeout.ms. Because in the implementation of the Kafka consumer, a single thread consumes messages and performs heartbeats, if the thread is stuck processing messages, there is no thread to perform heartbeats even when it is time to do so. Many students set a long session.timeout.ms time when dealing with problems, but eventually the heartbeat timed out, because they did not properly deal with the correlation between these two parameters. The simple summary for rebalance problems is this: Manage heartbeats and consumption timeouts. For the heartbeat timeout problem. Generally, you need to increase the heartbeat timeout period (session.timeout.ms) and adjust the ratio between the timeout period (session.timeout.ms) and the heartbeat interval (heartbeat.interval.ms). The official documentation of Aliyun suggests that the timeout period (session.timeout.ms) be set to 25s, with the maximum duration not exceeding 30 seconds. Then the heartbeat interval (heartbeat.interval.ms) is no more than 10s. Handle timeouts for consumption. Generally, the processing time of consumers is increased (max.poll.interval.ms) and the number of messages processed each time is decreased (max.poll.records). The official documentation of Aliyun suggests that the value of max.poll.records should be much smaller than the consumption capacity of the current consumer group (Records < number of consumption by a single thread per second x number of consumption threads x session.timeout seconds).

Reference kafkajs:Kafka.js.org/docs/1.13.0…