This is the sixth day of my participation in the Gwen Challenge.More article challenges

A quick introduction to the characteristics of the consumer group:

  1. Kafka provides an extensible and fault-tolerant consumer mechanism
  2. There are multiple consumer instances within a group
  3. When consuming configuration, it is specifiedgroup-id. Which is sharing
  4. Consumers within the group work together to coordinate all partitions that consume subscribed topics
  5. Each partition can only be consumed by one Consumer Instance within a group

Summary: a partition can only be consumed by one group of consumers; But a single consumer can consume multiple partitions; This is an N:1 model

Consumption model

  1. p2p
  2. pub/sub

The Consumer Group unified the two models:

  • If all Consumer Instances belong to the same Group, it implements a message queue model;
  • If all Consumer Instances belong to different groups, then it implementspub/sub

Ideally, the number of Consumer instances should be equal to the total number of partitions that the Group subscribes to.

Consumption control

First consumer consumption, need to submit the consumption shift -> that is, the shift to submit. This is for individual consumers.

In the case of a Consumer Group, the Group needs to manage its subordinate Consumer instances, which means it needs to record the consumption shifts of these consumers.

It’s easy to imagine that this is a KV pair, Key is the partition, and V corresponds to the latest shift of the partition consumed by the Consumer. Map

Talk about it later…
,>

Consumer version differences

  • First, the difference:
  1. Older versions of Consumer Group saved shifts in ZooKeeper
  2. The new version saves the shift commit in Kafka’s internal theme, namely:__consumer_offsets
  • What’s the reason?

A shift commit, even an asynchronous commit, is still a frequent write to ZK. All of Kafka 0.8.2.x has a shift theme in the new Consumer version. For a shift commit, there are only two requirements:

  1. High persistence
  2. High frequency to write

Obviously, Kafka’s own thematic design naturally meets these two requirements.

__consumer_offsets

The shift management of __consumer_offsets is also simple: since it is a theme, the shift submission of the Consumer is submitted as a normal shift MSG to the __consumer_offsets theme. Essentially, __consumer_offsets stores the Consumer’s displacement

Although it is a theme, it is also an internal theme, so developers do not need to manage it, do not need to manipulate it.

The message format is a kind of KV pair organization structure:

  1. Key:
  2. Value: Simply think of the message body as holding the displacement value

It is generally recommended that the manual submission be set to enable.auto.mit = false. The developer needs to explicitly call the shift commit method provided by Kafka: Consumer.com mitSync.

One problem with automatic commit is that the comSummer process keeps writing commit messages to the __consumer_offsets topic as long as it doesn’t stop.

Problem: An offset commit may be committed more than once. For the broker, however, only the latest displacement commit is required. This requires kafka to make a deletion of the shift commit message in __consumer_offsets topic, otherwise:

  1. Too many invalid messages, burstbrokerdisk
  2. Consumers repeat purchases (which can be avoided by business logic, of course)

So Kafka uses the Compact policy to remove expired messages from __consumer_offsets topic. Kafka provides a special background thread that periodically inspects the topic to be Compact to see if there is deletable data that meets the criteria. This background thread is called the Log Cleaner

A few other caveats:

  1. Displacement the displacement of the topic byKafkaThe inside of theCoordinatorTo manage

Rebalance

  • First, meaning:

It is essentially a protocol that specifies how all consumers in a Consumer Group agree to allocate each partition under a subscription Topic.

So when does the Consumer Group make Rebalance? There are three trigger conditions:

  1. The number of Consumer instances in the group has changed: new additions/previous exits (finished consumption)
  2. The number of group topics has changed:consumer.subscribe(Pattern.compile("t.*c"))If a topic is consumed in a regular manner and new topics are added, then the group needs to assign more partitions to the consumer Instance for consumption
  3. The number of partitions in the group topic changed. Procedure
  • What are the performance issues?
  1. Rebalance is similar to STW. All Consumer instances in the current group stop consuming and wait for the process to complete
  2. This process requires all consumers to participate in and reassign all partitions. In this way, consumers’ TCP connections to the Broker on which these partitions reside can continue to be used without having to recreate Socket resources to connect to other brokers.
  3. The whole process is slow, slow. This is the key, how to combine [1] is even more so

In the Rebalance process, all Comsumer instances in the same Consumer Group work together to allocate partitions with the help of a Coordinator component. However, none of the instances can consume messages during the entire process, which has a significant impact on Comsumer TPS.

Coordinator: A Coordinator that works for a Consumer Group is responsible for shift submission and Consumer Instance management in the Group. It also provides Consumer Rebalance.

Coordinator

When each Broker is started, a Coordinator component is created and started. How does a Consumer Group identify its Coordinator? Which broker does the consumption message correspond to? There are two main steps:

  1. in__consumer_offsetsIn the calculationpartitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount), through theConsummer Group groupIdHash, and then 50 (default__consumer_offsetsNumber of partitions) mod, calculatepartitionId
  2. According to the last steppartitionIdFor example, 15 corresponds to__consumer_offsetsPartition 15, and then find partition 15leaderWhere is abrokerI know the correspondingCoordinator

In fact, the above is to help us developers to determine, problems can be quickly identified problem nodes. But when it’s actually run, Kafka confirms it automatically.

Then let’s solve the Rebalance problem above. Unfortunately, it can’t be solved, but it can be avoided as far as possible. All go back to the three scenarios raised previously:

  1. The number of group members changes
  2. The topic to which the group subscribed has changed
  3. Topic changes within a partition

Changes to the subscription itself are almost operational: business needs to scale, consumption rates, etc. Rebalance is basically all about changing the number of members in a group.

This usually happens when we add a Consummer Instance to an existing group_id, so we start a consumer application ourselves. The Coordinator then accepts the new instance, adds it to the group, and reassigns partitions.

That’s where all the problems start. This is also normal, in fact, can be avoided, you can take multithreaded (coroutine) consumption.