This is the fifth day of my participation in the Gwen Challenge.More article challenges

In the process of sending () to send a broker, the ProducerRecord message goes through:

  1. Interceptor
  2. Serializer
  3. Partitioner

A series of actions are then actually sent to the broker. Interceptor is not required, Serializer is.

After the message is serialized, it is necessary to determine the partition to which it is sent, and this is where the partitioning strategy comes in.

String msg = i + " This is matt's blog.";
// Can be set to send partition:
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, msg);
producer.send(record);
Copy the code

In the ProducerRecord constructor above, you can set the partition to be sent:

// the partition is selected when the message is being constructed, since you are sending it, it must be specified when the message is being constructed
public ProducerRecord(topic, partition, timestamp, K key, V value, headers) {}
Copy the code

All ProducerRecord constructs execute this function. If the partition field is specified in the message ProducerRecord, the role of the partitioner is not required.

Partition is

If no partition is specified, ProducerRecord is also assigned a partition policy by default. This relies on the internal default partition:

Kafka provided in the default partition is org. Apache. Kafka. Clients. Producer. The internals. DefaultPartitioner, It implements the org. Apache. Kafka. Clients. Producer. The Partitioner interface

Let’s take a look at how the default partition divider defines the partition policy:

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {
        if (keyBytes == null) {
            return stickyPartitionCache.partition(topic, cluster);
        }
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
Copy the code
  1. keyIs empty, the rotation training algorithm is adopted,You select the availability zones within the topic[Availability zone]
  2. keyNot null, hash the key (using MurmurHash2 algorithm, with high performance and low collision rate), and finally calculate the partition number according to the hash value obtained.Messages with the same key are written to the same partition.

But ⚠️ : Without changing the number of subject partitions, the mapping between keys and partitions can remain the same. However, once you add partitions to a topic, it becomes difficult to ensure that keys map to partitions.

Custom divider

So just a Partitioner interface like the one provided by the DefaultPartitioner implementation. It is then injected into kafka Broker via config:

// Implement a version of each of the above three partitioning strategies
public class DemoPartitioner implements Partitioner {
    private final AtomicInteger counter = new AtomicInteger(0);
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (null == keyBytes) {
            return counter.getAndIncrement() % numPartitions;
        } else return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
    @Override public void close(a) {}
    @Override public void configure(Map
       
         configs)
       ,> {}}// 1. The round-robin policy is assigned in sequence
// Do not specify partition and key as the default partition

// 2. Random strategy
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());

// 3. Use the key-ordering policy to keep the message order according to the specified Key
// However, it is strongly not to implement it yourself. Kafka already implements it internally
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();

// 4. Customize partitions. According to region [North and South machine room zone], warehouse [location zone]
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return partitions.stream().filter(p -> isSouth(p.leader().host()))
  .map(PartitionInfo::partition).findAny().get();
Copy the code

Then inject:

Properties props = new Properties();
// Set the custom partition
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DemoPartitioner.class.getName());

roducer<String, String> producer = new KafkaProducer<>(props);
Copy the code