In the last article, WE discussed the implementation of Kafka Broker, the storage structure of data, and the persistence of messages. After a message is stored, it is consumed by the consumer.

1) Pull mechanism Kafka production side is a Push mechanism, the consumer side is a Pull mechanism, namely, Pull. 2) Advantages and disadvantages of Pull are that the consumer can control the reading speed and quantity of messages by itself; The disadvantage is that you don’t know if the server has data, so you have to pull all the time or pull at intervals, maybe multiple times and wait. 3) Message delivery semantics: Kafka guarantees at-least-once delivery by default, allowing users to implement at-most-once semantics. Exactly -once implementation depends on the destination storage system. 4) RangeAssignor: Allocate according to the range of partitions. The default policy is RangeAssignor. RoundRobinAssignor: round robin assignment; StickyAssignor: Kafka 0.11 has been introduced, with more metrics like load as evenly as possible.

These are also mentioned in the previous article.

Consumer groups

The Consumer Group is an extensible and fault tolerant Consumer mechanism provided by Kafka. Kafka uses only one mechanism, the Consumer Group, but implements both the message queue model and the publish/subscribe model of traditional messaging engine systems.

Ideally, the number of Consumer instances should equal the total number of partitions that the Group subscribes to.

[Consumers and Consumer Groups]

A Kafka consumer is part of a consumer group, and when multiple consumers form a consumer group to consume a topic, each consumer receives a message from a different partition. Suppose you have a T1 topic that has four partitions; We also have a consumer group, G1, which has only one consumer, C1. Then consumer C1 will receive a message for the four partitions as follows:

An important feature of Kafka is that a message can be written once, allowing any number of applications to read the message. In other words, each application can read the full amount of messages. In order for each application to be able to read the full amount of messages, the application needs to have different consumer groups. For the example above, if we add a new consumer group, G2, that has two consumers, it looks like this:

Here’s what we should note:

  • A topic can be consumed by multiple consumer groups, but the consumption data of each consumer group is non-interference, that is to say, each consumer group consumes complete data.

  • A partition can only be consumed by one consumer in the same consumer group, but not by multiple consumers. That is to say, if there are more consumers in a certain consumer group than the number of partitions in the Topic, the extra consumers will not be effective

Process of consumer partition allocation

So let’s see what the allocation process looks like.

  1. Each time we create a consumer group, Kafka assigns a broker to serve as a coordinator for that consumer group.

  2. Register consumers and select leader Consumer When a coordinator is available, consumers will start to register with the coordinator. The first consumer to register will become the leader of the consumer group, and the subsequent consumers will become followers

  3. After the leader is elected, the leader obtains information about partitions and consumers in real time from the Coordinator, allocates partitions to each consumer based on the partitioning policy, and reports the allocation results to the Coordinator.

  4. Follower consumers can obtain their own zone information from coordinators for consumption. As for follower consumers, they only know their own zone and do not know the existence of other consumers.

  5. At this point, all consumers know the partition of their consumption, and the partition process ends. When the partition rebalancing occurs, the leader will repeat the allocation process

Refer to the previous article for a detailed flow chart.

About the displacement

【 Displacement offset】

  • In the process of consuming a message, each Consumer must have a field to record where it currently consumes in the partition. This field is the Consumer Offset, which is an indicator of the consumption progress of the Consumer.

  • It looks like Offset is just a number. In fact, for the Consumer Group, it is a Group of KV pairs. Key is the partition, and V corresponds to the latest displacement TopicPartition->long for the Consumer to consume the partition

  • But remember that the consumer shift is the shift of the next piece of news, not the shift of the latest consumption news.

  • The commit shift represents the progress of the Consumer, so that when the Consumer fails and restarts, it can read the previously committed shift from Kafka and continue to consume from the corresponding shift, avoiding the whole consumption process to start again.

[Preservation of displacement]

When a Consumer application commits a shift, it actually commits a shift to the Broker where the Coordinator resides. Similarly, when a Consumer application is started, it sends requests to the Broker where a Coordinator resides. The Coordinator then performs metadata management operations such as registration of Consumer groups and member management records.

In the new version of the Consumer Group, the Kafka community has redesigned the way Consumer Groups manage shifts. You take the approach of storing shifts in an internal Kafka theme, __consumer_offsets, commonly known as the shift theme. See my previous article “Basic Concepts, Architectures, and a new version of the Updated Kafka Body of Knowledge 1” for more on why kafka saved shift was abandoned.

[Data format of displacement theme]

key
  • The Key of the shift topic should hold three parts: the Group ID, the topic name, and the partition number
value
  • The main store is the offset information, of course, and the timestamp information, do you remember that you can reset a consumer according to the time to start spending

[Submission of displacement]

1. Automatic submission

If enable.auto.mit is set to true, the consumer will automatically submit the maximum offset received from the poll() method every 5s.

Possible problems: Data is read repeatedly

Suppose we still use the default 5s commit interval, and a rebalancing occurs 3s after the last commit, after which the consumer reads the message from the last commit offset. At this point, the offset is already 3s behind, so messages arriving within 3s are processed repeatedly. Offsets can be committed more frequently by modifying the commit interval to reduce the time window in which duplicate messages can occur, but this cannot be completely avoided.

2. Manually submit the file

2.1 Synchronous Submission

Problems with synchronization

  • By its name, it is a synchronous operation, meaning that the method waits until the shift has been successfully committed before returning. If an exception occurs during the commit, this method throws the exception information.

  • The problem with commitSync() is that the Consumer program blocks until the remote Broker returns the commit. Note that the synchronous commit retries after the commit fails

  • In any system, blocking due to program rather than resource constraints can be a bottleneck, affecting the TPS of the entire application and affecting throughput.

2.2 Asynchronous Submission

One disadvantage of manual submission is that the application blocks until the broker responds to the submission request, limiting the throughput of the application. We can increase throughput by reducing the commit frequency, but if rebalancing occurs, it will increase the number of duplicate messages.

You can use asynchronous submission where you just send the submission request without waiting for the broker to respond. It does not retry because a larger offset may have been committed successfully by the time it receives the server response.

Suppose we make a request to commit offset 2000, and there is a temporary communication problem. The server does not receive the request and does not respond. In the meantime, we processed another batch of messages and successfully committed the offset 3000. If commitAsync() tries again to commit offset 2000, it may commit successfully after offset 3000. If rebalancing occurs at this point, duplicate messages will occur.

Problems with asynchrony

  • The problem with commitAsync is that it does not automatically retry when a problem occurs. Because it is an asynchronous operation, if it automatically retries after a failed commit, the displacement value submitted by the time it retries may already be “out of date” or not up to date. Therefore, asynchronous commit retries are meaningless, and commitAsync does not retry, so the last commit succeeds before the program stops.

  • If the last commit was successful, the offsets will be ignored. If the last commit was unsuccessful, we can manually specify offset at the next restart

Combine asynchronous and synchronous commit

Both commitSync() and commitAsync() are used. For regular, staged manual commits, we call commitAsync() to avoid blocking, and before the Consumer is about to close, we call commitSync() to perform a synchronous blocking shift commit to ensure that the correct shift data is saved before the Consumer closes.

Rebalance

Ownership of partitions is transferred from one consumer to another, a behavior known as rebalancing. Rebalancing is important because it brings high availability and scalability to a consumer group that can be added or removed with confidence. Here are three actions that trigger rebalancing:

  1. When a consumer joins a group and reads a partition that was read by another consumer, rebalancing is triggered.

  2. When a consumer leaves the group (shut down or crash), the partition that was read by it will be read by other consumers in the group, triggering rebalancing.

  3. When a Topic changes, such as when a new partition is added, partition redistribution occurs, triggering rebalancing.

This Topic is not available during partition rebalancing, so ****Rebalance is too slow!!

This is to add unnecessary partition rebalancing due to incorrect configuration in production. Normal cluster changes are no longer considered:

  1. Prevent the Consumer from being kicked out of the Consumer group because of the delay in sending heartbeat. You can set the session.timeout.ms timeout period and heartbeat.interval.ms heartbeat interval You can set the timeout period to three times of the heartbeat interval.

  2. It’s caused by taking too long to buy something. If the Consumer end cannot consume the messages from the poll within the specified time, it is considered that there is something wrong with the Consumer, and the Consumer will automatically leave the group. Therefore, we can set max.poll.interval.ms to take a little longer than the processing time.

  3. From the second point we might also extend that if the cluster is often partitioned in equilibrium, you might want to look at how long it takes consumers to perform tasks, with particular attention to GC usage.

Often online problems are also caused by unreasonable configuration.