Writing in the front

I don’t think there will be a cult server before o&W, Nginx is fixed, Kafka is not working again. Today, I wanted to sleep again, but the phone rang again. Or operation, “Hello, Glacier, have you arrived at the company? Let’s take a look at the server. There’s another problem. “On the way, the operation guy hasn’t come to work yet”? “Still on vacation…” I: “…” . Hey, did this guy run away? Leave him alone, the problem is still to be solved.

Problem reproduction

After arriving at the company, I put down my special backpack and took out my sharp weapon — laptop. After opening it, I quickly logged on the monitoring system and found that the main business system had no problem. A non-core service raises an alarm, and the monitoring system shows that this service frequently throws the following exception.

2021-02-28 22:03:05 131 pool-7-thread-3 ERROR [] - 
commit failed 
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned inpoll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:713) ~[MsgAgent-jar-with-dependencies.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:596) ~[MsgAgent-jar-with-dependencies.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1218) ~[MsgAgent-jar-with-dependencies.jar:na] at com.today.eventbus.common.MsgConsumer.run(MsgConsumer.java:121) ~[MsgAgent-jar-with-dependencies.jar:na] at Java. Util. Concurrent. ThreadPoolExecutor. RunWorker (ThreadPoolExecutor. Java: 1149) [na: 1.8.0 comes with _161] the at java.util.concurrent.ThreadPoolExecutor$Worker. The run (ThreadPoolExecutor. Java: 624) [na: 1.8.0 comes with _161] at Java. Lang. Thread. The run (748) Thread. Java: [na: 1.8.0 comes with _161]Copy the code

From the above output, you can probably tell the problem: After processing a batch of poll messages, the Kafka consumer reported the wrong offset to the broker when submitting it synchronously. This is probably because the partition of the current consumer thread was reclaimed by the broker because Kafka thinks the consumer has hung, as we can see from the following output.

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
Copy the code

Kafka triggers the Rebalance mechanism internally to clarify the problem, so we can start analyzing the problem.

To analyze problems

Since Kafka triggers the Rebalance, I’ll talk about when Kafka triggers the Rebalance.

What is a Rebalance

For a more specific example, let’s say there are 10 Consumer instances in a group that subscribes to a topic of 50 partitions. Normally, Kafka allocates five partitions per consumer. This allocation process is Rebalance.

Time to trigger the Rebalance

Rebalance when Kafka meets the following conditions:

  • The number of members in the group changes, such as new consumers joining the group or leaving the group. Group member leaving the consumer group Includes the group member crashing or leaving the consumer group voluntarily.
  • The number of subscribed topics has changed.
  • The number of subscribed topic partitions has changed.

The latter two situations can be avoided artificially. In practice, the most common cause of Rebalance with Kafka is changes in the consumer group.

It’s inevitable that a Consumer can add and stop normally, causing Rebalance. But in some cases, a Consumer instance can get kicked out of the Group because a Coordinator mistakenly thinks it has stopped, causing Rebalance.

After the Consumer Group completes the Rebalance, each Consumer instance periodically sends a heartbeat request to a Coordinator to indicate that it is still alive. If a Consumer instance fails to send heartbeat requests ina timely way, the Coordinator will consider the Consumer “dead” and remove it from the Group. The Coordinator then makes a new Rebalance. This time can be configured using the parameter session.timeout.ms on the Consumer side. The default value is 10 seconds.

In addition to this parameter, Consumer provides a parameter that controls how often heartbeat requests are sent, heartbeat.interval.ms. The smaller this value is set, the more frequently the Consumer instance will send heartbeat requests. The Coordinator can tell each Consumer instance that the Rebalance is on more quickly, because the Coordinator is telling each Consumer instance that the Rebalance is on. Encapsulate the REBALANCE_NEEDED flag in the response body of a heartbeat request.

In addition to the above two parameters, there is another parameter on the Consumer side that controls the impact of the Consumer’s actual consumption power on the Rebalance. This parameter is Max. Poll.interval. It limits the maximum interval between two calls to the poll method by the Consumer application. The default value is 5 minutes, which means that if the Consumer cannot consume the messages returned by the poll method within 5 minutes, the Consumer will initiate a “leave the group” request and the Coordinator will start a new round of Rebalance.

Here’s a look at some of the things that rebalance can be avoided:

The first type of Rebalance is unnecessary because a Consumer is “kicked” out of the Group because it fails to send a heartbeat in time. In this case, you can set session.timeout.ms and heartbeat.interval.ms to avoid making this rebalance. (The following configurations are best practices found online and have not been tested yet)

  • Set session.timeout.ms to 6s.
  • Set heartbeat.interval.ms to 2s.
  • Ensure that the Consumer instance can send at least 3 rounds of heartbeat requests before being judged “dead”, i.esession.timeout.ms >= 3 * heartbeat.interval.ms.

The purpose of setting session.timeout.ms to 6s is to enable coordinators to quickly locate failed consumers and kick them out of the Group.

The second type of unnecessary Rebalance is that consumers spend too much time consuming. In this case, the parameter value of max.poll.interval.ms is very important. If you want to avoid unexpected Rebalance, it’s best to set this parameter to a larger value than the downstream maximum processing time.

In short, allow plenty of time for business processing logic. This way, the Consumer doesn’t make Rebalance because it takes too long to process these messages.

Pull offset and commit offset

Kafka’s offsets are managed by the consumer, and there are two types of offsets: Position and COMMITTED. The pull offset represents the consumption progress of the current consumer partition. After each message consumption, the offset needs to be committed. When committing an offset, Kafka sends the value of the pull offset to the coordinator as the commit offset for the partition.

If there is no committed offset, the next time a consumer reconnects to the broker, consumption will start at the offset that the current consumer group has committed to the broker.

So, the problem is that when we take too long to process a message, it has been removed by the broker and the commit offset will report an error. So pull offsets are not committed to the broker and rebalance. The next time the partition is reallocated, the consumer will start consuming from the latest committed offset. Here comes the problem of repeated consumption.

Exception log prompt scheme

In fact, the exception log output by The Kafka consumer also provides a solution.

Next, let’s talk about pull and commit offsets in Kafka.

In fact, the output log information also provides a solution to the problem. Simply speaking, you can increase the duration of max.poll.interval.ms and session.timeout.ms to decrease the configuration value of max.poll.records. And the consumer submits the offset as soon as it finishes processing the message.

Problem solving

From the previous analysis, we should know how to solve this problem. It should be noted that I integrated Kafka using SpringBoot and the Kafka consumer listener. The main code structure on the consumer side is shown below.

@KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS, partitions = { "0" }) }, groupId = "kafka-consumer", containerFactory = "kafkaListenerContainerFactory")
public void consumerReceive (ConsumerRecord
        record, Acknowledgment ack){
    logger.info("topic is {}, offset is {}, value is {} n", record.topic(), record.offset(), record.value());
    try {
        Object value = record.value();
        logger.info(value.toString());
        ack.acknowledge();
    } catch (Exception e) {
        logger.error("Abnormal log consumption: {}", e); }}Copy the code

The above code logic is relatively simple. The message in Kafka is directly printed to a log file.

Try to solve

In this case, I first perform the configuration according to the information in the exception log. Therefore, I add the following configuration information in the SpringBoot application.yml file.

spring:
  kafka:
    consumer:
    properties:
     max.poll.interval.ms: 3600000
     max.poll.records: 50
     session.timeout.ms: 60000
     heartbeat.interval.ms: 3000
Copy the code

After the configuration is complete, test the consumer logic again and throw the Rebalance exception.

The final solution

Another way to look at the problem with Kafka consumers is that one Consumer produces messages and another Consumer consumes its messages. They cannot be under the same groupId. Change the groupId of one of them.

Here, our business project is developed in modules and subsystems, for example, module A produces messages, and module B consumes messages produced by module A. Changing a configuration parameter such as session.timeout.ms: 60000 makes no difference.

At this point, I try to modify the groupId of the consumer group by putting the following code

@KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS, partitions = { "0" }) }, groupId = "kafka-consumer", containerFactory = "kafkaListenerContainerFactory")
public void consumerReceive (ConsumerRecord
        record, Acknowledgment ack){
Copy the code

Change to the code shown below.

@KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS, partitions = { "0" }) }, groupId = "kafka-consumer-logs", containerFactory = "kafkaListenerContainerFactory")
public void consumerReceive (ConsumerRecord
        record, Acknowledgment ack){
Copy the code

Test again, problem solved ~~

What a strange problem this time!! Next to write a [Kafka series] topic, detailed introduction to Kafka principle, source code analysis and combat, etc., little friends you think? Welcome to discuss at the end of this article ~~

Recommended reading

  • Why can’t you get into the big factory?
  • The production distributed file system crashes on the first day of the New Year!!
  • Glacier reveals the core skills necessary for entering a big factory, take it!
  • Ten thousand word long article with you graphic computer network (super full)!!
  • What’s it like to work on a poorly executed team?
  • It took three days and nights to compile this 360,000 word open source high concurrent programming PDF!
  • [High concurrency] High concurrency SEC kill system architecture decryption, not all SEC kill is SEC kill!
  • High concurrency distributed lock architecture decryption, not all locks are distributed locks (upgraded version)!!

If you have any questions, you can leave a comment below or add me to wechat: SUN_shine_LYz. I will pull you into the group. We can exchange technology together, advance together, and make great force together