This article is for the blogger to study notes, the content comes from the Internet, if there is infringement, please contact delete.

Personal Note: github.com/dbses/TechN…

01 | what consumer group?

The Consumer Group feature

Consumer groups, or Consumer groups, are extensible and fault-tolerant Consumer mechanisms provided by Kafka.

There can be multiple consumers or Consumer instances within a Group that share a common ID, called a Group ID. All consumers within the group coordinated to consume all partitions of Subscribed Topics. Of course, each partition can only be consumed by one Consumer instance within the same Consumer group.

The Consumer Group has the following three features:

  1. A Consumer Group can have one or more Consumer instances. The instance here can be a single process or a thread under the same process.
  2. The Group ID is a string that is unique in a Kafka cluster.
  3. A single partition of a topic can only be assigned to a Consumer instance of a Consumer Group for consumption. Of course, it can also be consumed by other groups.

The Consumer Group versus the traditional messaging engine model

There are two main types of traditional message engine models, namely point-to-point model (message queue model) and publish/subscribe model.

The disadvantage of the traditional message queue model is that once a message is consumed, it is removed from the queue and can only be consumed by a single Consumer downstream. Obviously, the performance of this model is poor because multiple downstream consumers are “grabbing” messages from the shared message queue.

The publish/subscribe model allows messages to be consumed by multiple consumers, but the scalability problem is that each subscriber must subscribe to all partitions of the topic.

(How does Kafka work?)

Kafka’s Consumer Group avoids the drawbacks of both models and has the advantages of both. Consumer groups are independent and can subscribe to the same set of topics without interfering with each other.

If all instances belong to the same Group, it implements the message queue model; If all instances belong to different groups, it implements a publish/subscribe model. When a Consumer Group subscribes to multiple topics, each instance of the Group is not required to subscribe to all partitions of the topic; it only consumes messages from some partitions.

How many consumers are suitable for the Group?

In a real-world scenario, how do I know how many Consumer instances there should be in a Group? Ideally, the number of Consumer instances should equal the total number of partitions that the Group subscribes to.

To take A simple example, suppose A Consumer Group subscribes to three topics, A, B, and C, with partitions 1, 2, and 3, then it is generally ideal to have six Consumer instances for the Group. Because it maximizes high scalability.

(Here the theoretical description value and the example value contradiction ah!)

You might ask, can I set instances that are less than or greater than 6? Of course you can! If you have 3 instances, then on average each instance consumes about 2 partitions (6/3 = 2); If you set 8 instances, then unfortunately 2 instances (8 — 6 = 2) will not be allocated any partitions and will remain idle forever. Therefore, it is generally not recommended to have a Consumer instance larger than the total number of partitions in actual use. Setting up redundant instances is a waste of resources without any benefit.

How does Kafka manage displacement?

In the process of consumption, consumers need to record how much data they consume, that is, consumption location information. In Kafka, this position information has a special term: Offset.

In fact, for the Consumer Group, it is a Group of KV pairs, Key is the partition, and V corresponds to the latest shift of Consumer consumption in the partition. Think of it as a data structure Map<TopicPartition, Long>.

In the new release of Consumer Group (version 0.9), the Kafka community has redesigned the way shifts are managed for Consumer Groups by storing shifts inside Kafka themes. The internal theme is __consumer_offsets.

Consumer Group end rebalance

Rebalance is essentially a protocol that dictates how all consumers in a Consumer Group agree to allocate each partition of a subscription Topic. For example, a Group with 20 Consumer instances subscribes to a Topic with 100 partitions. Normally, Kafka allocates an average of five partitions per Consumer. This allocation process is called Rebalance.

There are three conditions that trigger Rebalance.

  1. The number of group members has changed. Procedure
  2. The number of subscribed topics changed. Procedure
  3. The number of partitions subscribed to the topic changed.

When A third member C joins the Group, Kafka triggers the Rebalance and Rebalance A, B, and C based on the default allocation policy, as shown in the following figure:

Rebalance is still fair.

Rebalance has some drawbacks, though. During Rebalance, all Consumer instances stop consuming and wait for the Rebalance to complete. This is very similar to the JVM’s stop the World garbage collection.

Second, the current design of Rebalance is that all Consumer instances participate together, reassigning all partitions. It would be more efficient to minimize changes in the allocation scheme. This eliminates the need to recreate Socket resources that connect to other brokers.

Finally, Rebalance is too slow. There was a foreign user who had hundreds of Consumer instances in his Group. Making the Rebalance would take hours! This is totally intolerable. The tragedy is that there is nothing the community can do about it, at least not yet with particularly good solutions. Maybe the best solution is to avoid making this Rebalance.

02 | the mysterious “displacement of the theme” veil

__consumer_offsets has the more formal name Offsets Topic in the Kafka source code.

Introduce the background and reason of displacement topic

In older versions of Consumer, shift management relies on Apache ZooKeeper, which automatically or manually submits shift data to ZooKeeper for saving. When the Consumer restarts, it automatically reads the shift data from ZooKeeper and continues spending where the last spending ended. This design eliminates the need for Kafka Broker to store shift data, reducing the amount of state space that the Broker side needs to hold, and thus enabling high scalability.

ZooKeeper, however, is not really suitable for this kind of high frequency writes, so the Kafka community has been working on modifying this design since the 0.8.2.x release, and has finally officially introduced a new shift management mechanism, including this new shift theme, in the new Consumer release.

Shift the message format of the topic

The shift management mechanism of the new Consumer is also very simple. The shift data of the Consumer is submitted to __consumer_offsets as plain Kafka messages. Suffice it to say, the main purpose of __consumer_offsets is to hold the displacement information of the Kafka consumer.

So what format does this topic store messages in? The Key of the shift topic should hold three parts: <Group ID, topic name, partition number >. The message body of the shift topic can simply be thought of as holding the shift value.

In fact, the message format of the shift topic saves the following two formats in addition to the above:

  • A message used to hold Consumer Group information

    It’s used to register for the Consumer Group.

  • Used to delete Group expired shifts or even delete Group messages

    This format has its own name: Tombstone messages, or delete mark, and it refers to the same thing. These messages only appear in the source code and are not exposed to you. Its main feature is that its message body is NULL, or empty message body.

Once all Consumer instances of a Consumer Group have stopped and their shift data has been deleted, Kafka writes a tombstone message to the corresponding partition of the shift topic to delete the Group completely.

How is the displacement theme created?

Typically, Kafka automatically creates a displacement theme when the first Consumer in a Kafka cluster starts. A shift theme is a normal Kafka theme, so it has a corresponding partition number. But if Kafka creates partitions automatically, how do you set the number of partitions?

It depends on the Broker side parameter offsets. Topic. Num. Value of partitions. Its default value is 50, so Kafka automatically creates a 50-partition displacement theme.

In addition to the number of partitions, how is the number of copies or backup factors controlled? The answer is simple, this is the Broker end another parameter offsets. The topic. The replication. The factor of things to do. Its default value is 3.

Of course, you can also create the displacement theme manually by using the Kafka API before any consumers are started in the Kafka cluster. The advantage of manual creation is that you can create displacement themes that meet the needs of your actual scene. Like a lot of people say 50 partition is too much for me, I don’t want so many partitions, so you can create your own it, don’t ignore offsets. The topic. Num. The value of the partitions.

My advice to you, however, is to let Kafka create automatically. Some of the current Kafka source code hardcodes 50 partitions, so if you create your own displacement theme that is different from the default partition number, you can run into all sorts of strange problems. This is a bug in the community, the code has been fixed, but it is still under review.

Where does the displacement theme come in?

Currently, Kafka Consumer can commit shifts in two ways: automatically and manually.

The Consumer side has a parameter called enable.auto.mit. If true, the Consumer will silently commit to you on a regular basis, with a dedicated parameter called auto.mit.interval.ms controlling the interval between submissions. One of the obvious advantages of automatic commit shifts is that you don’t have to worry about committing shifts to ensure that message consumption is not lost. But this is also a disadvantage. Because it’s so easy, it loses a lot of flexibility and control, and you can’t control the shift management on the Consumer side.

There is also a potential problem with auto-commit shifts: As long as the Consumer is started, it will write messages to the shift topic indefinitely.

Manually commit shift, that is, set enable.auto.mit = false. Once false is set, the Consumer application takes responsibility for shifting the submission. The Kafka Consumer API provides methods for shifting submissions, such as Consumer.com mitSync. When these methods are called, Kafka writes the corresponding message to the displacement topic.

How does Kafka remove expired messages from a displaced topic?

How does Kafka remove expired messages from a displacement topic? The answer is to Compaction.

Kafka uses the Compact policy to remove expired messages in a displacement topic, preventing the topic from ballooning indefinitely. So how do you define expiration in the Compact policy? For two messages M1 and M2 with the same Key, if M1 is sent earlier than M2, then M1 is an expired message. The process of Compact is to scan all the messages in the log, weed out those that are out of date, and collate the rest together.

This image from the official website illustrates the Compact process.

The Key of the message with displacement 0, 2 and 3 in the figure is K1. After Compact, the partition only needs to save the message with displacement 3 because it is the most recent sent.

Kafka provides a special background thread that periodically inspects the topic to be Compact to see if there is deletable data that meets the criteria. This background thread is called the Log Cleaner.

A lot of real production environments have the problem of displacement theme infinite expansion taking up too much disk space, if your environment also has this problem, I recommend you to check the status of the Log Cleaner thread, usually this thread is the result of hanging.

03 | consumer group heavy balance can avoid?

To avoid Rebalance, start with the timing of the Rebalance. As we said earlier, there are three times when Rebalance occurs:

  • The number of group members has changed
  • The number of subscribed topics has changed
  • The number of partitions subscribed to the topic changed

If the number of Consumer instances in a Consumer Group changes, it must cause Rebalance. This is the most common reason why Rebalance occurs. The latter two are often operational initiatives, so much of the Rebalance they cause is inevitable. Let’s focus on how to avoid making the Rebalance because of changing group members.

Typically, adding a Consumer instance is planned, perhaps for increased TPS or scalability. It’s not one of those ‘unnecessary Rebalance’ we need to avoid.

Group instance reduction is what we care about. When does a Coordinator think that a Consumer instance has been suspended and needs to be degroup?

How does Kafka sense if a Consumer has failed?

After the Consumer Group completes the Rebalance, each Consumer instance periodically sends a heartbeat request to a Coordinator to indicate that it is still alive. If a Consumer instance fails to send heartbeat requests ina timely way, the Coordinator will consider the Consumer “dead” and remove it from the Group. The Coordinator then makes a new Rebalance. The Consumer side has a parameter called session.timeout.ms, which is used to indicate this. The default value of this parameter is 10 seconds. That is, if a Coordinator does not receive a heartbeat from a Consumer instance in the Group within 10 seconds, it considers the Consumer instance to have been suspended.

In addition to this parameter, Consumer provides a parameter that allows you to control how often heartbeat requests are sent, heartbeat.interval.ms. The smaller this value is set, the more frequently the Consumer instance will send heartbeat requests.

In addition to the above two parameters, there is another parameter on the Consumer side that controls the impact of the Consumer’s actual consumption power on the Rebalance. This parameter is Max. Poll.interval. It limits the maximum interval between two calls to the poll method by the Consumer application. The default value is 5 minutes, which means that if your Consumer cannot consume the poll message within 5 minutes, the Consumer will initiate a “leave the group” request and the Coordinator will start a new round of Rebalance.

What Rebalance isn’t necessary?

The first type of Rebalance is unnecessary because a Consumer is “kicked” out of the Group because it fails to send a heartbeat in time.

Therefore, you need to set the values of session.timeout.ms and heartbeat.interval.ms carefully. Ensure that the Consumer instance can send at least 3 heartbeat requests before it is judged dead, that is, session.timeout.ms >= 3 * heartbeat.interval.ms.

Here are some recommended values that you can apply “mindlessly” to your production environment.

  • Set session.timeout.ms to 6s.
  • Set heartbeat.interval.ms to 2s.

The purpose of setting session.timeout.ms to 6s is to enable coordinators to quickly locate failed consumers and kick them out of the Group.

The second type of unnecessary Rebalance is that consumers spend too much time consuming. I had a client who had a scenario where the Consumer needed to process the message and write it to MongoDB when consuming data. Obviously, this is a very heavy consumption logic. Even the slightest instability in MongoDB can cause the Consumer to take longer to consume. In this case, the parameter value of max.poll.interval.ms is very important. To avoid unexpected Rebalance, you’d better set this parameter to a larger value than your downstream maximum processing time. In the example of MongoDB, if the maximum time to write MongoDB is 7 minutes, you can set this parameter to about 8 minutes.

If you make this Rebalance properly, I suggest you check the GC behavior on the Consumer end for frequent Full GC pauses that cause the Rebalance. Why specifically say GC? That’s because in real life, I’ve seen a lot of unexpected Rebalance situations where GC Settings are wrong and Full GCS occur too often.

04 | displacement in Kafka submit those thing

The shift we’re going to talk about today is the Consumer’s consumption shift, which records the shift of the next message that the Consumer is going to consume.

Remember the displacement of the next message, not the displacement of the current latest consumption message.

Because the Consumer can consume data from multiple partitions at the same time, the shift submission is actually done at partition granularity, meaning that the Consumer needs to submit its own shift data for each partition assigned to it.

Automatic delivery displacement

The Consumer side has a parameter enable.auto.mit, which defaults to true. Java Consumer defaults to automatic commit shifts. If automatic submission is enabled, there is another parameter on the Consumer side that comes in handy: auto.mit.interval.ms. Its default value is 5 seconds, which means that Kafka will automatically commit a shift for you every 5 seconds.

The following code shows how to set the automatic commit shift.

Properties props = new Properties();
props.put("bootstrap.servers"."localhost:9092");
props.put("group.id"."test");
props.put("enable.auto.commit"."true");
props.put("auto.commit.interval.ms"."2000");
props.put("key.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo"."bar"));
while (true) {
  ConsumerRecords<String, String> records = consumer.poll(100);
  for (ConsumerRecord<String, String> record : records)
    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
Copy the code

By default, the Consumer automatically submits shifts every 5 seconds. Now, let’s assume that the Rebalance occurs 3 seconds after the shift is committed. After the Rebalance, all consumers will continue to consume from the last shift that was made 3 seconds ago. Therefore, all consumers will have to consume again 3 seconds before the Rebalance.

While you can increase the commit frequency by reducing the auto.mit.interval.ms value, doing so will only narrow the time window for repeated consumption, not eliminate it completely. This is a drawback of the auto-commit mechanism.

Manual Synchronous Submission

To enable manual commit shift, set enable.auto.mit to false. You can call KafkaConsumer#commitSync() to commit the shift synchronously.

The following code shows the use of commitSync().

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    process(records); // Process the message
    try {
        consumer.commitSync();
    } catch (CommitFailedException e) {
        handle(e); // Handle the commit failure exception}}Copy the code

The advantage of manual displacement submission is that it is more flexible, and you can control the timing and frequency of displacement submission.

However, it has a drawback that the Consumer program blocks when calling commitSync() until the remote Broker returns the commit result. In any system, blocking due to program rather than resource constraints can be a bottleneck that affects the TPS of the entire application.

Manual asynchronous commit

In view of this problem, the Kafka community provides another API method for manually committing shifts: KafkaConsumer#commitAsync(). After calling commitAsync(), it returns immediately without blocking and therefore does not affect the TPS of the Consumer application.

Because it is asynchronous, Kafka provides callback functions that allow you to perform post-commit logic, such as logging or handling exceptions. The following code shows the method that calls commitAsync().

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    process(records); // Process the message
    consumer.commitAsync((offsets, exception) -> {
        if(exception ! =null) { handle(exception); }}); }Copy the code

Can commitAsync replace commitSync?

The answer is no. The problem with commitAsync is that it does not automatically retry when a problem occurs. Because it is an asynchronous operation, if it automatically retries after a failed commit, the displacement value submitted by the time it retries may already be “out of date” or not up to date. Therefore, asynchronous commit retries don’t really make sense, so commitAsync doesn’t retry.

Manual combined with asynchronous submission

Obviously, if you commit manually, you need to combine commitSync and commitAsync to achieve optimal results for two reasons:

  1. CommitSync automatic retry can be used to avoid transient errors such as network jitter, Broker side GC, etc. Since these problems are short-lived, automatic retries will usually succeed, so we don’t want to try them ourselves, and we want the Kafka Consumer to do it for us.
  2. We don’t want our program to be blocked all the time and affect TPS.

The following code combines the two API methods for manual submission.

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
        process(records); // Process the message
        consumer.commitAysnc(); // Use asynchronous commit to avoid blocking}}catch (Exception e) {
    handle(e); // Handle exceptions
} finally {
    try {
        consumer.commitSync(); // The last commit uses synchronous blocking commit
    } finally{ consumer.close(); }}Copy the code

For regular, staged manual commits, we call commitAsync() to avoid blocking, and before the Consumer is about to close, we call commitSync() to perform a synchronous blocking shift commit to ensure that the correct shift data is saved before the Consumer closes. By combining the two, we achieve asynchronous non-blocking shift management and ensure correct Consumer shift.

So, if you need to write your own code to develop a Kafka Consumer application, I recommend using the code example above to implement manual displacement submission.

More refined displacement submissions

We talked about automatic commit and manual commit, we talked about synchronous commit and asynchronous commit, is that all Kafka shift commit is? Actually, we’re still a little short. In fact, the Kafka Consumer API provides a more convenient set of methods that can help you achieve more refined displacement management functionality.

Imagine a scenario where, instead of 500 messages, your poll method returns 5000. Well, you don’t want to process all 5000 messages before committing the shift, because if something goes wrong, you’ll have to start all over again. This is similar to transaction processing in our database. Many times, we want to split a large transaction into several smaller transactions to commit separately, which can significantly reduce error recovery time.

The same is true in Kafka. For a Consumer that has to process many messages at once, it is interested in whether the community has a way to allow it to shift commits in the middle of a consumption. For example, in the 5000 message example above, you might want to commit a shift for every 100 messages processed to avoid a large number of messages being re-consumed.

The Kafka Consumer API provides this method for manual submission: CommitSync (Map<TopicPartition, OffsetAndMetadata>) and commitAsync(Map<TopicPartition, OffsetAndMetadata>). Their parameters are a Map object, the key is TopicPartition (the consumed partition), and the value is an OffsetAndMetadata object, which holds mostly shifted data.

Taking the example just mentioned, how do you commit a shift for every 100 messages processed? Here, I show a piece of code with commitAsync as an example. In fact, commitSync calls exactly the same method.

private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0; ...while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    for (ConsumerRecord<String, String> record: records) {
        process(record);  // Process the message
        offsets.put(new TopicPartition(record.topic(), record.partition()), 
                    new OffsetAndMetadata(record.offset() + 1);if(count %100= =0) {consumer.com mitAsync (offsets,null); // The callback processing logic is null} count++; }}Copy the code

The program first creates a Map object to hold the partition shifts to be committed during Consumer consumption processing, then starts processing the messages one by one and constructs the values of the shifts to be committed.

Remember earlier when I said to commit the displacement of the next message? This is why the current message displacement plus 1 is used to construct the OffsetAndMetadata object here.

The last part of the code does the shift commit. I’ve set up a counter here that commits a shift for every 100 messages accumulated. Instead of calling commitAsync with no arguments, we call commitAsync with a Map object argument for a fine-grained shift commit. This code is then able to commit a shift for every 100 messages processed and is no longer limited by the total number of messages returned by the poll method.

05 | CommitFailedException exception handling?

CommitFailedException, as the name suggests, is a serious exception that cannot be recovered when the Consumer client commits a switch. If exceptions are recoverable transient errors, the commit shift API can circumvent them on its own, because many of the commit shift API methods support automatic error retries, such as the commitSync method we mentioned in the previous installment.

Let’s discuss when this exception is thrown. From the source code side, CommitFailedException usually occurs when a manual commit shift is committed, that is, when the user explicitly calls the kafkaconsumer.mitsync () method. In terms of usage scenarios, there are two typical scenarios where this exception may occur.

Scenario 1: Message processing takes a long time

The Kafka Consumer throws a CommitFailedException when the total time for message processing exceeds the default value of the max.poll.interval.ms parameter.

The following code shows this exception:

... Properties props =newProperties(); ... props.put("max.poll.interval.ms".5000);
consumer.subscribe(Arrays.asList("test-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    Use thread. sleep to simulate real message processing logic
    Thread.sleep(6000L);
    consumer.commitSync();
}
Copy the code

To prevent exceptions from being thrown in this scenario, you need to simplify your message handling logic. Specifically, there are four ways.

  • Shorten the processing time of a single message

    For example, if the downstream system was able to consume a message in 100 milliseconds and was successfully optimized to consume a message in 50 milliseconds, the TPS on the Consumer side would be doubled.

  • Increases the maximum amount of time that the Consumer side allows downstream systems to consume a batch of messages

    This depends on the value of the Consumer parameter max-poll.interval. ms. In the latest version of Kafka, the default value of this parameter is 5 minutes. If your spending logic can’t be simplified, it’s a good idea to increase this value.

    It is worth noting that Kafka prior to 0.10.1.0 does not have this parameter, so if you are still using the client API prior to 0.10.1.0, you need to increase the value of the session.timeout.ms parameter. Unfortunately, the session.timeout.ms parameter has other meanings, so increasing the value of this parameter can have other “bad effects”, which is why the community introduced the max.poll.interval.ms parameter in version 0.10.1.0, One of the reasons for stripping this meaning out of session.timeout.ms.

  • Reduce the total number of messages consumed at a time by downstream systems

    This depends on the value of the Consumer side parameter Max. Poll. records. The current default value of this parameter is 500, which indicates that a call to the kafkaconsumer. poll method can return a maximum of 500 messages. You can say that this parameter specifies an upper limit on the total number of messages that can be returned by a single poll method. If neither method works for you, lowering this parameter is the easiest way to avoid CommitFailedException.

  • Downstream systems use multithreading to speed up consumption

    This is the “highest level” and the most difficult solution to achieve. The idea is to have the downstream system manually create multiple consuming threads to process a batch of messages returned by the poll method.

    In fact, many mainstream big data flow processing frameworks use this method. For example, When Apache Flink integrates Kafka, it creates multiple KafkaConsumerThread threads to process data consumption between multiple threads. However, every coin has its disadvantages, and this approach is not easy to implement, especially when it comes to the problem of how to handle displacement commits between multiple threads.

Combining these four methods, I personally recommend that you first try method 1 to prevent the occurrence of this abnormality. Optimizing the consumption logic of the downstream system is a good solution, unlike methods 2 and 3, which involve tradeoffs between TPS and Latency on the Kafka Consumer side. If method 1 is difficult to implement, then you can follow these rules to practice method 2 and 3.

Scenario 2: The same GroupId is displayed simultaneously

If your application has both a consumer group program and an independent consumer program with the same group. Id value, Kafka will immediately throw a CommitFailedException when the independent consumer program manually commits a shift. Because Kafka cannot recognize the consumer instance with the same group.ID, it returns an error indicating that it is not a legitimate member of the consumer group.

The Kafka Java Consumer side also provides a Standalone Consumer called the Standalone Consumer. There is no concept of a consumer group; each consumer instance works independently and has nothing to do with each other.

However, you need to note that the individual consumer’s shift submission mechanism is the same as that of the consumer group, so the individual consumer’s shift submission must also comply with the aforementioned rules, such as specifying the group.id parameter to submit the shift.

While the scene is unusual, it’s not entirely unheard of. In a large company, especially one that uses Kafka as a company-wide messaging engine, each department or team may have its own Consumer application. Who can guarantee that the group. Id of the respective Consumer application configuration is not duplicated? In the event of an unfortunate repetition, such as the one mentioned above, you can’t circumvent the exception by using any of the previous methods. Sadly, neither version of the exception description mentions this scenario at all, so if this is the cause of the CommitFailedException, none of the previous four methods are valid.