sequence

This paper mainly studies partition allocation in Kafka, mainly including the mapping of key to parition, partition allocation to consumer, and partition replica allocation to broker/machine.

1. Mapping a key to a partition

Kafka-clients-0.8.2.2-sources.jar! Kafka-clients-0.8.2.2-sources.jar! /org/apache/kafka/clients/producer/internals/Partitioner.java

/**
 * The default partitioning strategy:
 * <ul>
 * <li>If a partition is specified in the record, use it
 * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
 * <li>If no partition or key is present choose a partition in a round-robin fashion
 */
public class Partitioner {

    private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());

    /**
     * Compute the partition for the given record.
     * 
     * @param record The record being sent
     * @param cluster The current cluster metadata
     */
    public int partition(ProducerRecord<byte[], byte[]> record, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
        int numPartitions = partitions.size();
        if(record.partition() ! = null) { // they have given us a partition, use itif (record.partition() < 0 || record.partition() >= numPartitions)
                throw new IllegalArgumentException("Invalid partition given with record: " + record.partition()
                                                   + " is not in the range [0..."
                                                   + numPartitions
                                                   + "].");
            return record.partition();
        } else if (record.key() == null) {
            int nextValue = counter.getAndIncrement();
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(record.topic());
            if (availablePartitions.size() > 0) {
                int part = Utils.abs(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                returnUtils.abs(nextValue) % numPartitions; }}else {
            // hash the key to choose a partition
            returnUtils.abs(Utils.murmur2(record.key())) % numPartitions; }}}Copy the code
  • Kafka0.9 + version 0.9+ supports custom parition, which can be set using the partitioner. Class attribute. The original Partitioner becomes an interface: kafka-clients-0.9.0.1-sources.jar! /org/apache/kafka/clients/producer/Partitioner.java “` public interface Partitioner extends Configurable {

    / * *

    • Compute the partition for the given record.

      *
    • @param topic The topic name
    • @param key The key to partition on (or null if no key)
    • @param keyBytes The serialized key to partition on( or null if no key)
    • @param value The value to partition on or null
    • @param valueBytes The serialized value to partition on or null
    • @param cluster The current cluster metadata

      */

      public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

      / * *

    • This is called when partitioner is closed.

      */

      public void close();

}

Kafka-clients-0.9.0.1-sources.jar! /org/apache/kafka/clients/producer/internals/DefaultPartitioner.javaCopy the code

/ * *

  • The default partitioning strategy:
    • If a partition is specified in the record, use it
    • If no partition is specified but a key is present choose a partition based on a hash of the key
    • If no partition or key is present choose a partition in a round-robin fashion

      * /

      public class DefaultPartitioner implements Partitioner {

      private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());

      / * *

      • A cheap way to deterministically convert a number to a positive value. When the input is
      • positive, the original value is returned. When the input number is negative, the returned
      • positive value is the original value bit AND against 0x7fffffff which is not its absolutely
      • value.

        *
      • Note: changing this method in the future will possibly cause partition selection not to be
      • compatible with the existing messages already placed on a partition.

        *
      • @param number a given number
      • @return a positive number.

        */

        private static int toPositive(int number) {

        return number & 0x7fffffff;

        }

        public void configure(Map configs) {}

        / * *

      • Compute the partition for the given record.

        *
      • @param topic The topic name
      • @param key The key to partition on (or null if no key)
      • @param keyBytes serialized key to partition on (or null if no key)
      • @param value The value to partition on or null
      • @param valueBytes serialized value to partition on or null
      • @param cluster The current cluster metadata

        */

        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        List partitions = cluster.partitionsForTopic(topic);

        int numPartitions = partitions.size();

        if (keyBytes == null) {

         int nextValue = counter.getAndIncrement();
         List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
         if (availablePartitions.size() > 0) {
             int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
             return availablePartitions.get(part).partition();
         } else {
             // no partitions are available, give a non-available partition
             return DefaultPartitioner.toPositive(nextValue) % numPartitions;
         }Copy the code

        } else {

         // hash the keyBytes to choose a partition
         return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;Copy the code

        }}

        public void close() {}

    }

    
    ## 2. Distribution between partitions and consumersKafka-clients-0.10.2.1-sources.jar! Kafka-clients-0.10.2.1-sources.jar! /org/apache/kafka/clients/consumer/internals/PartitionAssignor.javaCopy the code

    / * *

    • This interface is used to define custom partition assignment for use in
    • {@link org.apache.kafka.clients.consumer.KafkaConsumer}. Members of the consumer group subscribe
    • to the topics they are interested in and forward their subscriptions to a Kafka broker serving
    • as the group coordinator. The coordinator selects one member to perform the group assignment and
    • propagates the subscriptions of all members to it. Then {@link #assign(Cluster, Map)} is called
    • to perform the assignment and the results are forwarded back to each respective members

      *
    • In some cases, it is useful to forward additional metadata to the assignor in order to make
    • assignment decisions. For this, you can override {@link #subscription(Set)} and provide custom
    • userData in the returned Subscription. For example, to have a rack-aware assignor, an implementation
    • can use this user data to forward the rackId belonging to each member.

      */

      public interface PartitionAssignor {

      / * *

      • Return a serializable object representing the local member’s subscription. This can include
      • additional information as well (e.g. local host/rack information) which can be leveraged in
      • {@link #assign(Cluster, Map)}.
      • @param topics Topics subscribed to through {@link org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(java.util.Collection)}
      • and variants
      • @return Non-null subscription with optional user data

        */

        Subscription subscription(Set topics);

        / * *

      • Perform the group assignment given the member subscriptions and current cluster metadata.
      • @param metadata Current topic/broker metadata known by consumer
      • @param subscriptions Subscriptions from all members provided through {@link #subscription(Set)}
      • @return A map from the members to their respective assignment. This should have one entry
      • for all members who in the input subscription map.

        */

        Map assign(Cluster metadata, Map subscriptions);
    /**
     * Callback which is invoked when a group member receives its assignment from the leader.
     * @param assignment The local member's assignment as provided by the leader in {@link #assign(Cluster, Map)} */ void onAssignment(Assignment assignment);Copy the code

    }

    Two built-in implementations - RangeAssignor kafka-clients-0.10.2.1-sources.jar! /org/apache/kafka/clients/consumer/RangeAssignor.javaCopy the code

    / * *

    • The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order
    • and the consumers in lexicographic order. We then divide the number of partitions by the total number of
    • consumers to determine the number of partitions to assign to each consumer. If it does not evenly
    • divide, then the first few consumers will have one extra partition.

      *
    • For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions,
    • resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.

      *
    • The assignment will be:
    • C0: [t0p0, t0p1, t1p0, t1p1]
    • C1: [t0p2, t1p2]

      */

      public class RangeAssignor extends AbstractPartitionAssignor {

      @Override

      public String name() {

       return "range";Copy the code

      }

      private Map> consumersPerTopic(Map> consumerMetadata) {

       Map<String, List<String>> res = new HashMap<>();
       for (Map.Entry<String, List<String>> subscriptionEntry : consumerMetadata.entrySet()) {
           String consumerId = subscriptionEntry.getKey();
           for (String topic : subscriptionEntry.getValue())
               put(res, topic, consumerId);
       }
       return res;Copy the code

      }

      @Override

      public Map> assign(Map partitionsPerTopic,

                                                   Map<String, List<String>> subscriptions) {
       Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
       Map<String, List<TopicPartition>> assignment = new HashMap<>();
       for (String memberId : subscriptions.keySet())
           assignment.put(memberId, new ArrayList<TopicPartition>());
      
       for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
           String topic = topicEntry.getKey();
           List<String> consumersForTopic = topicEntry.getValue();
      
           Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
           if (numPartitionsForTopic == null)
               continue;
      
           Collections.sort(consumersForTopic);
      
           int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
           int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
      
           List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
           for(int i = 0, n = consumersForTopic.size(); i < n; i++) { int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition); int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0:1); assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length)); }}return assignment;Copy the code

      }

    }

    - RoundRobinAssignor kafka - clients - 0.10.2.1 - sources. The jar! /org/apache/kafka/clients/consumer/RoundRobinAssignor.javaCopy the code

    / * *

    • The round robin assignor lays out all the available partitions and all the available consumers. It
    • then proceeds to do a round robin assignment from partition to consumer. If the subscriptions of all consumer
    • instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts
    • will be within a delta of exactly one across all consumers.)

      *
    • For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions,
    • resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.

      *
    • The assignment will be:
    • C0: [t0p0, t0p2, t1p1]
    • C1: [t0p1, t1p0, t1p2]

      *
    • When subscriptions differ across consumer instances, the assignment process still considers each
    • consumer instance in round robin fashion but skips over an instance if it is not subscribed to
    • the topic. Unlike the case when subscriptions are identical, this can result in imbalanced
    • assignments. For example, we have three consumers C0, C1, C2, and three topics t0, t1, t2,
    • with 1, 2, and 3 partitions, respectively. Therefore, the partitions are t0p0, t1p0, t1p1, t2p0,
    • t2p1, t2p2. C0 is subscribed to t0; C1 is subscribed to t0, t1; and C2 is subscribed to t0, t1, t2.

      *
    • Tha assignment will be:
    • C0: [t0p0]
    • C1: [t1p0]
    • C2: [t1p1, t2p0, t2p1, t2p2]

      */

      public class RoundRobinAssignor extends AbstractPartitionAssignor {

      @Override

      public Map> assign(Map partitionsPerTopic,

                                                   Map<String, List<String>> subscriptions) {
       Map<String, List<TopicPartition>> assignment = new HashMap<>();
       for (String memberId : subscriptions.keySet())
           assignment.put(memberId, new ArrayList<TopicPartition>());
      
       CircularIterator<String> assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet()));
       for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {
           final String topic = partition.topic();
           while(! subscriptions.get(assigner.peek()).contains(topic)) assigner.next(); assignment.get(assigner.next()).add(partition); }return assignment;Copy the code

      }

    public List<TopicPartition> allPartitionsSorted(Map<String, Integer> partitionsPerTopic,
                                                    Map<String, List<String>> subscriptions) {
        SortedSet<String> topics = new TreeSet<>();
        for (List<String> subscription : subscriptions.values())
            topics.addAll(subscription);
    
        List<TopicPartition> allPartitions = new ArrayList<>();
        for (String topic : topics) {
            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
            if(numPartitionsForTopic ! = null) allPartitions.addAll(AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic)); }return allPartitions;
    }
    
    @Override
    public String name() {
        return "roundrobin";
    }Copy the code

    }

    > < p style = "max-width: 100%; clear: both; Specific can see [Kafka consumption group (consumer group)] (http://www.cnblogs.com/huxi2b/p/6223228.html)## 3. Mapping between partition and machineWhen Kafka creates a topic, it needs to specify the number of paritition and replication. This number is fixed in advance. So which mathine does Partiton belong to? See this kind of kafka - 0.10.2.1 - SRC/core/SRC/main/scala/kafka/admin/AdminUtils scalaCopy the code

    / * *

    • There are 3 goals of replica assignment:

      *
      1. Spread the replicas evenly among brokers.
      1. For partitions assigned to a particular broker, their other replicas are spread over the other brokers.
      1. If all brokers have rack information, assign the replicas for each partition to different racks if possible

        *
    • To achieve this goal for replica assignment without considering racks, we:
      1. Assign the first replica of each partition by round-robin, starting from a random position in the broker list.
      1. Assign the remaining replicas of each partition with an increasing shift.

        *
    • Here is an example of assigning
    • broker-0 broker-1 broker-2 broker-3 broker-4
    • p0 p1 p2 p3 p4 (1st replica)
    • p5 p6 p7 p8 p9 (1st replica)
    • p4 p0 p1 p2 p3 (2nd replica)
    • p8 p9 p5 p6 p7 (2nd replica)
    • p3 p4 p0 p1 p2 (3nd replica)
    • p7 p8 p9 p5 p6 (3nd replica)

      *
    • To create rack aware assignment, this API will first create a rack alternated broker list. For example,
    • from this brokerID -> rack mapping:

      *
    • 0 -> “rack1”, 1 -> “rack3”, 2 -> “rack3”, 3 -> “rack2”, 4 -> “rack2”, 5 -> “rack1”

      *
    • The rack alternated list will be:

      *
    • 0, 3, 1, 5, 4, 2

      *
    • Then an easy round-robin assignment can be applied. Assume 6 partitions with replication factor of 3, the assignment
    • will be:

      *
    • 0 – > 0,3,1
    • 1 – > 3,1,5
    • 2 – > 1,5,4
    • 3 – > 5,4,2
    • 4 – > 4 0
    • 5 – > 2,0,3 *
    • Once it has completed the first round-robin, if there are more partitions to assign, the algorithm will start
    • shifting the followers. This is to ensure we will not always get the same set of sequences.
    • In this case, if there is another partition to assign (partition #6), the assignment will be:

      *
    • 6 -> 0,4,2 (instead of repeating 0,3,1 as partition 0)

      *
    • The rack aware assignment always chooses the 1st replica of the partition using round robin on the rack alternated
    • broker list. For rest of the replicas, it will be biased towards brokers on racks that do not have
    • any replica assignment, until every rack has a replica. Then the assignment will go back to round-robin on
    • the broker list.

      *
    • As the result, if the number of replicas is equal to or greater than the number of racks, it will ensure that
    • each rack will get at least one replica. Otherwise, each rack will get at most one replica. In a perfect
    • situation where the number of replicas is the same as the number of racks and each rack has the same number of
    • brokers, it guarantees that the replica distribution is even across brokers and racks.

      *
    • @return a Map from partition id to replica ids
    • @throws AdminOperationException If rack information is supplied but it is incomplete, or if it is not possible to
    • assign each replica to a unique rack.

      /

      def assignReplicasToBrokers(brokerMetadatas: Seq[BrokerMetadata],

      nPartitions: Int,

      replicationFactor: Int,

      fixedStartIndex: Int = -1,

      startPartitionId: Int = -1): Map[Int, Seq[Int]] = {

      if (nPartitions <= 0)

      throw new InvalidPartitionsException(“number of partitions must be larger than 0”)

      if (replicationFactor <= 0)

      throw new InvalidReplicationFactorException(“replication factor must be larger than 0”)

      if (replicationFactor > brokerMetadatas.size)

      throw new InvalidReplicationFactorException(s”replication factor: $replicationFactor larger than available brokers: ${brokerMetadatas.size}”)

      if (brokerMetadatas.forall(.rack.isEmpty))

      assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.map(
      .id), fixedStartIndex,

      startPartitionId)

      else {

      if (brokerMetadatas.exists(_.rack.isEmpty))

      throw new AdminOperationException(“Not all brokers have rack information for replica rack aware assignment”)

      assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex,

      startPartitionId)

      }

      }

      ` ` `

      The comments for this method are already very clear, but I’ll repeat them here.

    The target

    Replica Assignment has three objectives:

    • Divide replicas equally among brokers
    • A partition is no longer on the same broker as its other Replicas
    • If the broker has rack information, the partition replicas are allocated to different racks as possible

    strategy

    Kafka0.10 supports two replica Assignment strategies (in the case of partitions, it is also composed of N replicas). One is rack unware and the other is rack-ware, where rack means rack.

    • rack unware(Suppose there are 5 brokers and 10 partitions, each of which has 3 replicas)

      This strategy is mainly as follows:
      • Start from a random replica in the broker list, and then round-robin the allocation to the first replica in each partition
      • Then increment the other REPLICas of each partition by 1 digit dislocation

        Such as
        * broker-0  broker-1  broker-2  broker-3  broker-4
        * p0        p1        p2        p3        p4       (1st replica)
        * p5        p6        p7        p8        p9       (1st replica)
        * p4        p0        p1        p2        p3       (2nd replica)
        * p8        p9        p5        p6        p7       (2nd replica)
        * p3        p4        p0        p1        p2       (3nd replica)
        * p7        p8        p9        p5        p6       (3nd replica)Copy the code

        Assume that there are 10 partitions starting from Broker-0. Each partition has 3 replicas. You can see that P0 is in broker-0 and P1 is in broker-1. By the second replica, you can see that P0 is in Broker-1 and P1 is in broker-2, staggered by 1 bit.

    code

    private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
                                                     replicationFactor: Int,
                                                     brokerList: Seq[Int],
                                                     fixedStartIndex: Int,
                                                     startPartitionId: Int): Map[Int, Seq[Int]] = {
        val ret = mutable.Map[Int, Seq[Int]]()
        val brokerArray = brokerList.toArray
        val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
        var currentPartitionId = math.max(0, startPartitionId)
        var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
        for (_ <- 0 until nPartitions) {
          if(currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0)) nextReplicaShift += 1 val firstReplicaIndex =  (currentPartitionId + startIndex) % brokerArray.length val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))for (j <- 0 until replicationFactor - 1)
            replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
          ret.put(currentPartitionId, replicaBuffer)
          currentPartitionId += 1
        }
        ret
      }Copy the code
    • rack ware(Suppose there are 6 brokers, 3 racks, and 6 partitions, each of which has 3 replicas)

      First map the broker List to rack, for example
      0 - >"rack1"1 - >,"rack3"2 - >,"rack3", 3 - >"rack2"4 - >,"rack2", 5 - >"rack1"Copy the code

      Then round in rack order to get a new broker-list,

      0(rack1),3(rack2),1(rack3),5(rack1),4(rack2),2(rack3)Copy the code

      Then use round-Robbin to map parition and broker

      * 0-1 - > > 0,3,1 * 3 * 2 - > 1,5,4,1,5 * 3 * 4 - > 4 - > 5,4,2,0,3 0 * 5 - > 2Copy the code

      Three replicas of Partition0 are above broker-0, broker-3 and broker-1 respectively, and three replicas of Partition1 are above broker-3, Broker-1 and broker-5 respectively

    Suppose that the number of parition is greater than the number of brokers, then for the number of parition, the second replica will be shifted, such as

    6 -> 0,4,2 (instead of repeating 0,3,1 as partition 0)Copy the code

    For each parition, the first replica is distributed in round-robbin according to the list of rack mapping, and the other replicas after that are inclined to select brokers without replicas. Round-robin is used until each rack has replicas.

    In cases where Replicas are greater than or equal to the number of racks, there is at least one replica per rack. Otherwise, there is no more than one replica per rack. Ideally, replicas and racks are equal, and each rack has the same number of brokers. This ensures a balanced replica distribution between brokers and racks.

    code

    private def assignReplicasToBrokersRackAware(nPartitions: Int,
                                                   replicationFactor: Int,
                                                   brokerMetadatas: Seq[BrokerMetadata],
                                                   fixedStartIndex: Int,
                                                   startPartitionId: Int): Map[Int, Seq[Int]] = {
        val brokerRackMap = brokerMetadatas.collect { case BrokerMetadata(id, Some(rack)) =>
          id -> rack
        }.toMap
        val numRacks = brokerRackMap.values.toSet.size
        val arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap)
        val numBrokers = arrangedBrokerList.size
        val ret = mutable.Map[Int, Seq[Int]]()
        val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
        var currentPartitionId = math.max(0, startPartitionId)
        var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
        for (_ <- 0 until nPartitions) {
          if (currentPartitionId > 0 && (currentPartitionId % arrangedBrokerList.size == 0))
            nextReplicaShift += 1
          val firstReplicaIndex = (currentPartitionId + startIndex) % arrangedBrokerList.size
          val leader = arrangedBrokerList(firstReplicaIndex)
          val replicaBuffer = mutable.ArrayBuffer(leader)
          val racksWithReplicas = mutable.Set(brokerRackMap(leader))
          val brokersWithReplicas = mutable.Set(leader)
          var k = 0
          for (_ <- 0 until replicationFactor - 1) {
            var done = false
            while (!done) {
              val broker = arrangedBrokerList(replicaIndex(firstReplicaIndex, nextReplicaShift * numRacks, k, arrangedBrokerList.size))
              val rack = brokerRackMap(broker)
              // Skip this broker if
              // 1. there is already a broker in the same rack that has assigned a replica AND there is one or more racks
              //    that do not have any replica, or
              // 2. the broker has already assigned a replica AND there is one or more brokers that do not have replica assigned
              if((! racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks) && (! brokersWithReplicas.contains(broker) || brokersWithReplicas.size == numBrokers)) { replicaBuffer += broker racksWithReplicas += rack brokersWithReplicas += brokerdone = true
              }
              k += 1
            }
          }
          ret.put(currentPartitionId, replicaBuffer)
          currentPartitionId += 1
        }
        ret
      }Copy the code

    doc

    • Kafka Consumer Group
    • How Partitions are split into Kafka Broker?
    • KIP-36 Rack aware replica assignment
    • How to Rebalance Topics in a Kafka Cluster
    • Kafka Partitioning…
    • Kafka redistributes partitions