One, foreword

In our last post we talked about the mechanics of Rebalance, so I’m sure you have a good idea of what rebalancing is all about. As a quick refresher, Rebalance is the process of making all Consumer instances in a Consumer Group agree on how to consume all partitions of a subscribed topic. In the Rebalance process, all Consumer instances in the Consumer Group work together with the Coordinator component to assign subscriptions to topic partitions. However, none of the instances can consume any messages during the entire process, so it has a significant impact on the Consumer’s TPS. Does it look like a GC for the JVM? We know that frequent JVM GC can be a nightmare for delay-sensitive businesses, so we tune the GC so that the JVM doesn’t STW as often.

If the Consumer group is rebalanced, the Consumer will be slow. If the Consumer group is not sensitive to real-time processing, it is acceptable to slow down a bit. If the Consumer group is not sensitive to real-time processing, the Consumer will be kicked out. Automatically subscribe to a topic by removing a new thread from the thread pool as a Consumer. This means that a new Consumer joins the Consumer Group, causing Rebalance. The new Consumer is removed from the Consumer Group again before it can process all the messages. This is the cycle that makes so much Rebalance.

Rebalance

  • The Rebalance affects the Consumer TPS. The Coordinator component allocates a subscription topic partition. None of the instances in the Consumer group can consume any messages.
  • If you have a group with a lot of consumer instances, the Rebalance is slow, which can have an impact on your business.
  • Rebalance is not efficient. The current Kafka design mechanism dictates that all members of the Consumer Group weigh in every time you Rebalance. Locality is often not considered, but it is especially important to improve system performance.

On the third point, do you think it’s unreasonable for the Kafka community to require all members to participate in the Kafka community? You should randomly assign the partition of the consumer who leaves the consumer group to the other consumers, with the same partition allocation strategy for the other consumers. This minimizes the impact of the Rebalance on the remaining Consumer members.

Yes, the same community you’re thinking of has launched StickyAssignor, a sticky partition allocation strategy, in 0.11.0.0. Making this Rebalance means that every time you Rebalance, you’ll make as little change to the partition as possible. Unfortunately, this strategy is a bit buggy and requires an upgrade to 0.11.0.0, so it’s not used much in real production environments.

You may ask, is there any community solution to this problem? No, especially when it comes to slow Rebalance, the Kafka community has nothing to do with it. That being the case, how about trying to avoid Rebalance as much as possible, especially when it’s unnecessary?

Timing your Rebalance

To avoid Rebalance, start with the timing of the Rebalance. As we discussed earlier, there are several times to trigger the Rebalance:

  • A new Consumer has joined the Consumer Group
  • A Consumer is down and offline. A Consumer does not need to be offline. For example, if a Consumer does not send a HeartbeatRequest to a GroupCoordinator for a long time due to a long GC or network delay, the GroupCoordinator considers that the Consumer is offline.
  • Some consumers voluntarily quit the Consumer Group (send leaveGroup quest request). For example, the client calls the unsubscribe() method to unsubscribe from some topics.
  • The Consumer timed out and did not commit the offset within the specified time.
  • The GroupCoordinator node corresponding to the Consumer Group has changed.
  • The number of partitions for any topic or topic to which the Consumer Group is subscribed changes.

Rebalance

A little abstract, isn’t it? Make no mistake. Here’s an example to make this Rebalance.

4.1 producers

/ * * *@author: wechat official account [Lao Zhou Chat structure] */
public class KafkaProducerRebalanceTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        KafkaProducer<String,String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++) {
            ProducerRecord record = new ProducerRecord<>("topic_test"."userName"."riemann_"+ i); producer.send(record); } producer.close(); }}Copy the code

4.2 consumers

/ * * *@author: wechat official account [Lao Zhou Chat structure] */
public class KafkaConsumerRebalanceTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_group_test");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        String topic = "topic_test";
        long pollTimeout = 100;
        long sleep = TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);

        consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                System.out.println("Partition Revoked");
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                System.out.println("New assignment : " + collection.size() + " partitions"); }});while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(pollTimeout));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
                try {
                    Thread.sleep(sleep);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
Copy the code

4.3 Terminal Logs

Consumer log

[Consumer clientId=consumer-consumer_group_test-1, groupId=consumer_group_test] Member consumer-consumer_group_test-1-7d64e140-f0e3-49d2-8230-2621ba1d2061 sending LeaveGroup Request to Coordinator 127.0.0.1:9092 (ID: 2147483643 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

Server Logs

[2022-01-16 16:38:25,315] INFO [GroupCoordinator 4]: Member[group.instance.id None, member.id consumer-consumer_group_test-1-7d64e140-f0e3-49d2-8230-2621ba1d2061] in group consumer_group_test has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)

[2022-01-16 16:38:25,315] INFO [GroupCoordinator 4]: Preparing to rebalance group consumer_group_test in state PreparingRebalance with old generation 37 (__consumer_offsets-17) (reason: removing member consumer-consumer_group_test-1-7d64e140-f0e3-49d2-8230-2621ba1d2061 on LeaveGroup) (kafka. Coordinator. Group. GroupCoordinator) [the 16:38:25 2022-01-16, 316] INFO [4] GroupCoordinator: Group consumer_group_test with generation 38 is now empty (__consumer_offsets-17) (kafka.coordinator.group.GroupCoordinator)

Why is Rebalance in the consumer group? As can be seen from the Consumer logs, the Consumer thread cannot send heartbeat to the Coordinator node for a long time. The Coordinator node thinks that the Consumer is down, so the Coordinator removes the Consumer node from the Consumer group. This triggers the Rebalance mechanism.

This also has to do with the Consumer’s heartbeat sending mechanism. Most middleware designs separate the business thread from the heartbeat sending thread. Kafka does not do this, probably for simplicity. If it takes a long time for consumers to consume services, we can configure the parameter max-poll.interval. ms, which represents the maximum interval between two poll consumption. The default value is 300000 ms, that is, 5 minutes. Alternatively, we can reduce the amount of data that the consumer pulls from the broker at a time by specifying the parameter Max. Poll. records. The consumer pulls 500 by default.

In version 0.10.1, Kafka corrected the Consumer heartbeat sending mechanism by assigning heartbeat sending to dedicated HeartbeatThreads, rather than relying on user application threads for regular polling, as in earlier versions. This design proved to be quite a tricky adjustment, adding session timeouts would provide more time for message processing, but the consumer group would also spend more time detecting failures such as process crashes. Kafka Consumer 0.10.1 introduced max.poll.interval.ms to decouple processing timeouts from session timeouts. The max.poll.interval.ms parameter is significant because even if the heartbeat sends normally, that only proves that the Consumer is alive, but the Consumer may be in suspended animation, For example, the Consumer encountered a deadlock resulting in a long wait that exceeded the interval set by poll.poll.interval.ms.

Rebalance

There are several times you can trigger the Rebalance mechanism, but there are three main categories:

  • The number of group members has changed
  • The number of subscribed topics has changed
  • The number of partitions subscribed to the topic changed

The latter two are often operational initiatives, so much of the Rebalance they cause is inevitable. Now, let’s focus on how to avoid unnecessary Rebalance — that is, how to avoid changing the number of group members.

If the number of Consumer instances in the Consumer Group changes, this must cause Rebalance. This is the most common cause of Rebalance.

The increase in Consumer instances is easy to understand. When we start a Consumer program configured with the same group.id value, we are actually adding a new Consumer instance to the Consumer group. At this point, the Coordinator accepts the new instance, adds it to the group, and reassigns partitions. Typically, adding a Consumer instance is planned, perhaps for increased TPS or scalability. All in all, it’s not one of those unnecessary Rebalance we want to avoid.

We are more concerned about the instance reduction in the Consumer Group. If you want to stop some Consumer instances, the point is that in some cases, a Consumer instance can be mistakenly considered by a Coordinator to be “stopped” and thus “kicked out” of the Consumer Group. If this is causing the Rebalance, we can’t just leave it.

When does a Coordinator think that a Consumer instance has been suspended and needs to be degroup? Let’s take a look at a few parameters for the consumer configuration:

  • Session.timeout. ms Specifies the timeout period
  • Heartbeat.interval. ms Heartbeat interval
  • Max.poll.interval. ms Specifies the processing time of each consumption
  • Max.poll. records Number of messages consumed each time

5.1 the session. A timeout. Ms

This is the timeout period between the Consumer and Broker. By default, it is 10s. If the Broker fails to receive a heartbeat after its session.timeout.ms setting, it will remove the Consumer and Rebalance it.

This value must be set in the Broker configuration group. Min. The session. A timeout. Ms and group. Max. Session. A timeout. Ms.

This parameter and heartbeat.interval.ms allow you to control the frequency of the Rebalance.

5.2 the heartbeat. Interval. Ms

Time between heartbeats. The heartbeat is between the Consumer and the Coordinator. The heartbeat is used to keep the Consumer in session and help with Rebalance when a Consumer joins or leaves a Consumer Group.

This value must be set to less than session.timeout.ms because: If a Consumer cannot send the Heartbeat to a Coordinator for some reason and the time elapsed is longer than session.timeout.ms, the Consumer is considered to have exited. The Partition to which it subscribes is allocated to other consumers in the same Consumer Group.

The value is usually set to 1/3 less than session.timeout.ms. The default value is 3s

5.3 Max. Poll. Interval. Ms

The maximum interval between two poll method calls, in milliseconds, defaults to 5 minutes. If the consumer does not poll within that interval, the consumer will be removed, triggering a rebalance to allocate the queue allocated by the consumer to other consumers.

Kafka has a dedicated heartbeat thread for sending heartbeats, so the Consumer Client can still effectively send heartbeats, but the Consumer is in livelock (livelock) state, so data processing is not effective. So Kafka uses the parameter max.poll.interval.ms to circumvent this problem.

5.4 Max. Poll. Records

The maximum number of records retrieved by the Consumer each time poll() is called. The maximum amount of data pulled by each poll method executed; Is the sum of data based on all partitions allocated, rather than the maximum amount of data pulled from each Partition. The default value is 500.

In layman’s terms, it refers to how many messages are fetched per consumption. The larger the number of messages, the longer the processing time. Make sure that you can consume all messages within the time set by max.poll.interval.ms. Otherwise, rebalance will occur.

How to avoid Rebalance

Rebalance simply, there are two points of Rebalance that are not necessary:

  • The consumer’s heartbeat timed out, causing Rebalance.
  • Consumers take too long to process, leading to Rebalance.

6.1 Consumer Heartbeat Timeout

We know that the consumer communicates with the coordinator through the heartbeat. If the coordinator can’t receive the heartbeat, the coordinator will assume that the consumer is dead and make a Rebalance.

Here are the mainstream recommended values in the industry, which can be adjusted according to their own business:

  • Set up thesession.timeout.ms = 6s.
  • Set up theheartbeat.interval.ms = 2s.
  • Ensure that the Consumer instance can send at least 3 rounds of heartbeat requests before being judged “dead”, i.esession.timeout.ms >= 3 * heartbeat.interval.ms.

Session.timeout. ms >= 3 * heartbeat.interval.ms instead of 5 or 10

Session.timeout. ms is a “logical” indicator. For example, it specifies a threshold of six seconds, at which a Coordinator considers the Consumer to have died if it does not receive any messages from the Consumer. Heartbeat. Interval. ms is a “physical” indicator that tells the Consumer to send a heartbeat packet to a Coordinator every two seconds. The smaller the heartbeat. It has a real impact on the number of TCP packets sent, which is why I call it a “physical” metric.

If a Coordinator does not receive a Consumer’s heartbeat within a heartbea.interval. ms cycle, it makes little sense to remove that Consumer from the Consumer Group. It’s as if the Consumer made a mistake and bludgeoned it to death. In fact, there could be network latency, there could bea long GC at the Consumer, affecting the arrival of Heartbeat packets, and maybe the next Heartbeat is fine.

Ms Rebalance the Consumer Group. Rebalance the Consumer Group using REBALANCE_IN_PROGRESS in the heartbeat package. The Consumer will know when the Rebalance is happening, so it can update the partitions that the Consumer can consume. If the session.timeout.ms parameter is exceeded, a Coordinator thinks the Consumer has died. The Coordinator does not need to make the Rebalance to the Consumer.

In versions of Kafka after 0.10.1, session.timeout.ms and max.poll.interval.ms are decoupled. A consumer instance contains two threads: a consumer instance contains two threads; a consumer instance contains two threads; a consumer instance contains two threads; One is the Heartbeat thread and the other is the Processing thread. The Processing thread can be understood as the thread that calls the consumer.poll method to execute the message Processing logic, while the Heartbeat thread is a background thread that is “hidden” from the programmer. If the message processing logic is complex, such as 5 minutes, then max.poll.interval.ms can be set to a value greater than 5 minutes. The Heartbeat thread is related to the parameter heartbeat.interval.ms. The Heartbeat thread sends a Heartbeat packet every Heartbeat.interval.ms to a Coordinator to prove that it is still alive. A Coordinator considers the Consumer alive as long as the Heartbeat thread has sent a Heartbeat packet to a Coordinator during session.timeout.ms.

Before Kafka 0.10.1, sending heartbeat packets and message processing logic were coupled. If a message takes 5 minutes to process and session.timeout.ms=3000ms, the Coordinator can remove the Consumer from the Consumer Group by the time the message is processed, because there is only one thread. A heartbeat packet cannot be sent to a Coordinator during message processing. If a heartbeat packet is not sent after 3000ms, the Coordinator moves the Consumer out of the Consumer Group. To separate the two, a Processing thread executes message Processing logic and a Heartbeat thread sends Heartbeat packets. If a message takes 5 minutes to process, the Consumer can continue processing the message as long as the Heartbeat thread sends a Heartbeat packet to a Coordinator within session.timeout.ms. Without having to worry about being removed from the Consumer Group. Another benefit is that if something goes wrong with a Consumer, it can be detected within session.timeout.ms instead of waiting for max.poll.interval.ms.

Ms >= 3 * heartberes.interval. ms = 3 * heartberes.interval. ms = 3 * heartbeat. The larger the setting, the longer the Consumer hangs it will take to detect, which is obviously not reasonable.

6.2 Consumer processing time is too long

If the Consumer takes too long to process, this can also cause the coordinator to assume that the Consumer is dead and initiate a rebalance.

Kafka’s consumer parameters are as follows:

  • Max.poll.interval. ms Indicates the maximum processing time of each consumption
  • Max.poll. records Number of messages consumed each time

In this case, it is generally a matter of increasing the consumer processing time (i.e., increasing the value of max.poll.interval.ms) and reducing the number of messages per processing (i.e., decreasing the value of max.poll.records).

Rebalance, max.poll.interval.ms The Rebalance, max.poll.interval.ms maximum processing time is set to 60000ms. In the consumer.poll method, I slept for 2min to simulate the processing time. The processing time was longer than max.poll.interval.ms, causing Rebalance.

6.3 GC performance at the Consumer end

If this is not enough to avoid the Rebalance on the Kafka level, I suggest checking GC behavior on the Consumer side for frequent Full GC pauses that cause the Rebalance.