Partition allocation is an important concept in Kafka, which affects the overall performance balance of Kafka. There are three places in Kafka that cover this concept: producers sending messages, consumers consuming messages, and creating topics. While these three operations can all be called “partition allocation,” they actually involve different things.

Partition allocation of producers

When a user sends a message using the Kafka client, the send method is called and the message is automatically sent to the Broker.

This process needs to go through an interceptor, serializer, partitioner, and so on before it can actually be sent to the broker. A message needs to identify the partition to which it needs to be sent before it can be sent to the broker. If a partition field is specified in ProducerRecord, there is no need for a partition because partition represents the partition number to be sent to. If no partition field is specified in the ProducerRecord message, you need to rely on the partition divider to calculate the value of the partition based on the key field. The purpose of a divider is to partition messages.

The default partition provided in Kafka is DefaultPartitioner, which implements the Partitioner interface (the user can implement this interface to customize the partition). The partition method is used to implement the specific partition allocation logic:

public int partition(String topic, Object key, byte[] keyBytes,
                     Object value, byte[] valueBytes, Cluster cluster);
Copy the code

By default, if the key of the message is not null, the default partition partition hashes the key (using the MurmurHash2 algorithm with high performance and low collision rate), eventually calculates the partition number based on the hash value, and messages with the same key are written to the same partition. If the key is null, the message will be polled to the various availability zones of the topic.

Note: If the key is not null, the computed partition number will be any one of the partitions. If the key is null and there are availability zones, the partition number calculated is either of the availability zones. Note the difference between the two.

Partition allocation of consumers

By default, each partition can only be consumed by one consumer in the same consumer group. A consumer’s partitioning assignment assigns a consumer in a consumer group a partition in a topic to which they subscribe.

As shown in the figure, there are four partitions on a topic: P0, P1, P2, and P3. Two consumer groups A and B have subscribed to this topic, with four consumers in consumer group A (C0, C1, C2, and C3) and two consumers in consumer group B (C4 and C5). The default Kafka rule is that each consumer in consumer group A is assigned to one partition, and each consumer in consumer group B is assigned to two partitions, with no effect on each other. Each consumer can only consume messages in the partition to which it is assigned.

Kafka provides three partitioning strategies for consumers: RangeAssignor, RoundRobinAssignor, and StickyAssignor. RangeAssignor is the default partitioning strategy.

RangeAssignor

The principle of the RangeAssignor strategy is to divide the total number of consumers by the total number of partitions to obtain a span, and then allocate the partitions evenly across the span to ensure that the partitions are distributed as evenly as possible to all consumers. For each topic, the RangeAssignor strategy sorts all the consumers in the consumer group that subscribe to that topic in lexicographical order by name, and then allocates a fixed partition range for each consumer. If the distribution is not equal, the highest consumer in the lexicographical order is assigned an additional partition.

Assuming n= number of partitions/number of consumers, and m= number of partitions % number of consumers, then the first m consumers are allocated N +1 partitions each, and the following (number of consumers -m) consumers are allocated N partitions each.

To illustrate the RangeAssignor strategy in a more general way, let’s give a few more examples. Suppose there are two consumers C0 and C1 in the consumer group, both subscribed to topics t0 and T1, and each topic has four partitions. Then all the partitions subscribed to can be identified as t0P0, T0P1, T0P2, T0P3, T1P0, T1P1, T1P2, and T1P3. The final distribution result is:

Consumer C0: T0P0, T0P1, T1P0, T1P1 Consumer C1: T0P2, T0P3, T1P2, T1P3Copy the code

This distribution is very uniform, but can this distribution strategy maintain this good property? Let’s look at another case. Assuming that both topics in the above example have only three partitions, all subscribed partitions can be identified as t0P0, T0P1, T0P2, T1P0, T1P1, t1P2. The final distribution result is:

Consumer C0: T0P0, T0P1, T1P0, T1P1 Consumer C1: T0P2, T1P2Copy the code

It is obvious that the distribution is not even. If the similar situation is enlarged, some consumers may be overloaded. Let’s look at another round Bin Assignor strategy.

RoundRobinAssignor

The RoundRobinAssignor policy works by lexicographically ordering the partitions of all consumers in a consumer group and all the topics that consumers subscribe to, and then polling each consumer to assign the partitions to each consumer. RoundRobinAssignor corresponding partition strategy. The assignment. The strategy parameter value is: org. Apache. Kafka. Clients. Consumer. RoundRobinAssignor.

If all consumers in the same consumer group have the same subscription information, the RoundRobinAssignor policy allocates partitions evenly. For example, suppose there are two consumers C0 and C1 in the consumer group, both subscribed to topics t0 and T1, and each topic has three partitions. Then all the partitions subscribed to can be identified as t0P0, T0P1, T0P2, T1P0, T1P1, and T1P2. The final distribution result is:

Consumer C0: T0P0, T0P2, T1P1 Consumer C1: T0P1, T1P0, T1P2Copy the code

If consumers in the same consumer group subscribe to different information, then the partition allocation is not complete polling allocation, which may lead to uneven partition allocation. If a consumer does not subscribe to a topic within the consumer group, the consumer will not be allocated any partitions for that topic at partition allocation time.

For example, suppose there are three consumers C0, C1 and C2 in the consumer group, and they have subscribed to three topics: T0, T1 and T2, which have 1, 2 and 3 partitions respectively, that is, the whole consumer group has subscribed to six partitions: T0P0, T1P0, T1P1, T2P0, T2P1 and T2P2. Specifically, consumer C0 subscribes to topics T0, consumer C1 subscribes to topics t0 and T1, and consumer C2 subscribes to topics t0, T1, and T2, so the final allocation result is:

Consumer C0: T0P0 Consumer C1: T1P0 Consumer C2: T1P1, T2P0, T2P1, t2P2Copy the code

As you can see, the RoundRobinAssignor strategy is not perfect either. This assignment is not optimal because it is perfectly possible to assign t1p1 to consumer C1.

StickyAssignor

Take a look at StickyAssignor. The word sticky can be translated as sticky. Kafka has introduced sticky assignor since version 0.11.x.

  1. Partition distribution should be as uniform as possible;
  2. Partitions are allocated as much as possible as they were last allocated. When the two conflict, the first goal takes precedence over the second. Given these two goals, StickyAssignor is much more complex to implement than the RangeAssignor or RoundRobinAssignor. Let’s take a look at the StickyAssignor strategy in action.

Suppose there are three consumers in the consumer group: C0, C1, and C2, each of which subscribes to four topics: T0, T1, T2, and T3, and each topic has two partitions, that is, the entire consumer group subscribes to eight partitions: T0P0, T0P1, T1P0, T1P1, T2P0, T2P1, T3P0, and T3P1. The final distribution result is as follows:

Consumer C0: T0P0, T1P1, T3P0 Consumer C1: T0P1, T2P0, T3P1 Consumer C2: T1P0, T2P1Copy the code

This may at first appear to be the same result as a round Bin Assignor, but is it really the case? Assuming that consumer C1 is separated from the consumer group, the consumer group will perform a rebalancing operation, and the consumption partition will be reallocated. If RoundRobinAssignor is used, the assignment results are as follows:

Consumer C0: T0P0, T1P0, T2P0, T3P0 Consumer C2: T0P1, T1P1, T2P1, T3P1Copy the code

As shown in the allocation results, the RoundRobinAssignor policy repolls the allocation for consumers C0 and C2. If StickyAssignor is used, the result is:

Consumer C0: T0P0, T1P1, T3P0, T2P0 Consumer C2: T1P0, T2P1, T0P1, T3P1Copy the code

It can be seen that the allocation result retains all the allocation results of the previous allocation for consumers C0 and C2, and distributes the “burden” of the original consumer C1 to the remaining two consumers C0 and C2. Finally, the allocation of C0 and C2 remains balanced.

If partition reallocation occurs, it is possible that the previous consumer and the newly assigned consumer are not the same for the same partition, and that the halfway processing of the previous consumer has to be repeated in the newly assigned consumer, which is obviously a waste of system resources. StickyAssignor, like the sticky in its name, makes the allocation policy sticky and tries to make two assignments as same as possible, thus reducing the loss of system resources and other exceptions.

So far, we have analyzed the same subscription information of consumers. Let’s take a look at the processing of different subscription information.

For example, there are three consumers in the same consumer group: C0, C1, and C2, and there are three topics in the cluster: T0, T1, and T2, which have 1, 2, and 3 partitions respectively. That is to say, there are six partitions in the cluster: T0P0, T1P0, T1P1, T2P0, T2P1, and T2P2. Consumer C0 subscribes to topics T0, consumer C1 to topics t0 and T1, and consumer C2 to topics t0, T1, and T2.

If RoundRobinAssignor is used, the result will look like this:

[Allocation Result Set 1] Consumer C0: T0P0 Consumer C1: T1P0 Consumer C2: T1P1, T2P0, T2P1, t2P2Copy the code

If StickyAssignor is used, the final allocation is as follows:

[Allocation Result Set 2] Consumer C0: T0P0 Consumer C1: T1P0, t1P1 Consumer C2: T2P0, T2P1, t2P2Copy the code

As you can see, this is an optimal solution (consumer C0 does not subscribe to topics T1 and T2, so it cannot be assigned any partitions in topics T1 and T2, and the same can be inferred for consumer C1). If consumer C0 breaks away from the consumer group, the allocation result of RoundRobinAssignor strategy is:

Consumer C1: T0P0, T1P1 Consumer C2: T1P0, T2P0, T2P1, t2P2Copy the code

As you can see, the RoundRobinAssignor strategy preserves the original allocation of three partitions for consumers C1 and C2: T2P0, T2P1, and T2P2 (for result set 1). If StickyAssignor is used, the result is:

Consumer C1: T1P0, T1P1, T0P0 Consumer C2: T2P0, T2P1, t2P2Copy the code

As you can see, StickyAssignor preserves the existing allocation of five partitions for consumers C1 and C2: T1P0, T1P1, T2P0, T2P1, and T2P2.

As a result, StickyAssignor is much better than the other two, and the code implementation of this strategy is extremely complex.

User-defined partition allocation policies

Kafka handling support provided by default three partition algorithm, also supports user-defined partition allocation algorithm, the distribution of the custom policy must realize org. Apache. Kafka. Clients. Consumer. The internals. PartitionAssignor interface. PartitionAssignor is defined as follows:

Subscription subscription(Set<String> topics); String name(); Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions); void onAssignment(Assignment assignment); class Subscription { private final List<String> topics; private final ByteBuffer userData; (omitted several methods...) } class Assignment { private final List<TopicPartition> partitions; private final ByteBuffer userData; (omitted several methods...) }Copy the code

Two internal classes are defined in the PartitionAssignor interface: Subscription and Assignment.

The Subscription class, which represents a consumer’s Subscription information, has two properties: Topics and userData, which represent a list of topics and user-defined information, respectively. The PartitionAssignor interface uses the subscription() method to set the subscription information for the consumer itself. Notice that there is only one parameter called topics in this method, which corresponds to topics in the Subscription class. But there are no parameters for userData. To increase user control over assignment results, the Subscription () method adds user-defined information to userData that affects assignment, such as weights, IP addresses, hosts, or racks.

For example, the Subscription () method provides rack information that identifies the rack location to which this consumer is deployed, and partitioning assignments can be performed based on the rack location of the leader copy of the partition, so that the consumer is on the same rack as the broker node to which the message is pulled. Referring to the figure below, consumer Consumer1 and Broker1 are both deployed on rack Rack1, and consumer Consumer2 and Broker2 are deployed on rack Rack2. If the partition assignment is not rack-aware, then it is possible that consumer1 consumes the broker2 partition and Consumer2 consumes the Broker1 partition as in the figure (top). If the partition allocation is rack-aware, the result of the allocation shown in the figure (below) is consumer1 consuming the partition in Broker1 and Consumer2 consuming the partition in Broker2, which reduces both consumption latency and cross-rack bandwidth footprint compared to the previous scenario.

Referring to the Assignment class, which represents Assignment result information, the class also has two attributes: Partitions and userData, which represent the collection of partitions and user-defined data that have been assigned, respectively. The onAssignment() method in the PartitionAssignor interface is a callback function when each consumer receives the result of the consumer group leader assignment. For example, in StickyAssignor, this method is used to save the current assignment scheme. In order to provide a reference for the next consumer group rebalance.

The name() method provides the name of the assignment policy. RangeAssignor protocol_name is “range” for the three assignment policies provided by Kafka. The protocol_name of RoundRobinAssignor is roundrobin, and the protocol_name of StickyAssignor is sticky. Therefore, when naming a user-defined allocation policy, ensure that it does not conflict with an existing allocation policy. This name is used to identify the name of the allocation policy, which will be involved in joining a consumer group and electing a consumer group leader, as described later.

A real partition allocation scheme is implemented in the Assign () method, where metadata represents metadata information for a cluster and subscriptions represent subscriptions for individual consumer members within a consumer group. The final method returns allocation information for individual consumers.

Kafka also provides an abstract class org. Apache. Kafka. Clients. Consumer. The internals. AbstractPartitionAssignor, it can simplify the PartitionAssignor the implementation of the interface, The Assign () method is implemented, which removes the userData information from the Subscription before it is allocated. Kafka provides three allocation strategies that are all inherited from this abstract class. If developers in the custom partition strategy userData information will be used to control the partition of the results, it can not directly inherited AbstractPartitionAssignor this abstract class, and the need to directly implement PartitionAssignor interface.

The RangeAssignor strategy in Kafka is defined as RandomAssignor. The RangeAssignor strategy is defined as RandomAssignor.

package org.apache.kafka.clients.consumer; import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor; import org.apache.kafka.common.TopicPartition; import java.util.*; public class RandomAssignor extends AbstractPartitionAssignor { @Override public String name() { return "random"; } @Override public Map<String, List<TopicPartition>> assign( Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) { Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions); Map<String, List<TopicPartition>> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) { assignment.put(memberId, new ArrayList<>()); } for (map. Entry<String, List<String>> topicEntry: consumersPerTopic.entrySet()) { String topic = topicEntry.getKey(); List<String> consumersForTopic = topicEntry.getValue(); int consumerSize = consumersForTopic.size(); Integer numPartitionsForTopic = partitionsPerTopic.get(topic); if (numPartitionsForTopic == null) { continue; } / / under the current topic of all partitions List < TopicPartition > partitions. = AbstractPartitionAssignor partitions (topic, numPartitionsForTopic);  For (TopicPartition partition: partitions) {int rand = new Random().nextint (consumerSize); String randomConsumer = consumersForTopic.get(rand); assignment.get(randomConsumer).add(partition); } } return assignment; } // Get a list of consumers for each topic, i.e. : [topic, List[consumer]] private Map<String, List<String>> consumersPerTopic( Map<String, Subscription> consumerMetadata) { Map<String, List<String>> res = new HashMap<>(); for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) { String consumerId = subscriptionEntry.getKey(); for (String topic : subscriptionEntry.getValue().topics()) put(res, topic, consumerId); } return res; }}Copy the code

When used, the consumer client needs to add the corresponding Properties parameter as shown in the following example:

properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
RandomAssignor.class.getName());
Copy the code

Implementation of allocation

If there are two allocation policies configured in the consumer client, which one will prevail? If there are multiple consumers with different allocation policies configured for each other, which one shall prevail? The partition allocation between multiple consumers needs to be coordinated, so what is the process of this coordination?

A GroupCoordinator in kafka coordinates the partition allocation of each consumer group. For each consumer group, there is a GroupCoordinator on the kafka server. The process for coordinating zone allocation is as follows: 1. Each consumer submits its own allocation policy to a GroupCoordinator. As shown in the figure below, the allocation policy and subscription information for each consumer proposal is included in the JoinGroupRequest request.

2. A GroupCoordinator collects proposals from each consumer and performs the following two steps: 1. Second, the partition allocation strategy of election consumption group.

It is easy to understand why the leader of the consumer group should be elected, because the final implementation of the partition allocation strategy needs to be executed by a member, and the leader consumer just plays this role. In Kafka, enforcement of the specific partition allocation policy is delegated to the consumer client, which provides greater flexibility. For example, if the allocation policy needs to be changed, the consumer client can wake up by modifying it, rather than modifying and restarting the Kafka server.

How to elect the leader of the consumption group? This can be analyzed in two cases: if there is no leader in the consumption group, the first consumer who joins the consumption group is the leader of the consumption group; If the leader consumer quits the consumer group at some point due to some reasons, a new leader will be elected, and the process of re-electing the leader is more “arbitrary”, the relevant code is as follows:

//scala code.
private val members = new mutable.HashMap[String, MemberMetadata]
var leaderId = members.keys.head
Copy the code

To explain these two lines of code: In a GroupCoordinator, consumer information is stored as a HashMap, where the key is the name of the consumer and the value is consumer-related metadata information. The leaderId represents the name of the Leader consumer, and its value is the key of the first key-value pair in the HashMap, which is basically the same as a random selection. Generally speaking, the leader election process of the consumer group is very random.

How to elect the allocation strategy of the consumer group? Take a vote. Each consumer can set its own partition allocation strategy. For the consumer group, it is necessary to elect a mutually “convincing” strategy from each distribution strategy submitted by each consumer to carry out the overall partition allocation. Elections for this partition allocation are not determined by the Leader consumers, but by the votes of individual consumers within the consumer group. In this case, the “decision is made according to the votes of each consumer in the group” does not mean that the GroupCoordinator further interacts with each consumer to implement the decision. Instead, the GroupCoordinator implements the decision according to the allocation strategy submitted by each consumer. The final distribution strategy can basically be regarded as the one most supported by each consumer. The specific election process is as follows:

Collect all allocation strategies supported by each consumer and form candidate candidates.

Each consumer identifies the first strategy he/she supports from the candidates and votes for this strategy.

Calculate the votes of each strategy in the candidate set. The strategy with the most votes is the allocation strategy of the current consumer group.

If a consumer does not support the elected allocation strategy, an error is reported.

3. The GroupCoordinator sends a receipt to each consumer and assigns a zone to the leader consumer.

As shown in the figure above, the JoinGroupResponse receipt contains information about the allocation policy that is voted by the GroupCoordinator. Moreover, only the leader consumer’s receipt contains the subscription information of each consumer, because only the Leader consumer needs to perform the specific allocation based on the subscription information, the rest of the consumption does not need.

4. After sorting out a specific partition allocation scheme, the leader consumer requests SyncGroupRequest and submits it to the GroupCoordinator. The GroupCoordinator then selects the distribution result for each consumer and informs them via SyncGroupResponse receipt.

Partition allocation at the broker end

A partition assignment for a producer specifies the partition to which each message will be sent, a partition assignment for a consumer specifies the partition to which the message can be consumed, and a partition assignment for a cluster is the partitioning copy assignment for the creation of a topic, that is, which copies of which partitions will be created in which broker. Whether partition allocation is balanced affects the overall load balancing of Kafka, including the concept of priority copy.

When creating a theme, if the replica-Assignment parameter is used, the replica of the zone is created according to the specified scheme. If the replica-Assignment parameter is not used, then internal logic is required to calculate the allocation scheme. The internal allocation logic for creating a topic using the kafka-Topics. Sh script is divided into two policies based on rack information: no rack information and specified rack information. If all broker nodes in the cluster are not configured with the broker.rack parameter, or if the disable-rack-aware parameter is used to create the topic, then the allocation policy does not specify rack information, otherwise the allocation policy does specify rack information.