🍂 bloggers are working on the 2022 project:Take the dream as the horse, set sail, 2022 dream chasers

Producer message partitioning mechanism

When we use Kafka, we want to distribute data evenly across all servers. In this way, our load balancing becomes perfect.

1. Partition cause

In short, Kafka’s message organization structure is topic-partition-replica-message.

A message can be saved to only one partition, but not multiple copies in multiple partitions.

Why do we need to create a partition for Kafka?

The main reason is: to achieve high scalability of the system, different partitions can be placed on machines of different nodes, we can increase the throughput of the overall system by adding machines.

Of course, AKF can also be used to explain:

The AKF cube, also known as The Scala Cube, was first described in The Art of Scalability to provide a systematic extension.

AKF divides system expansion into the following three dimensions:

  • X-axis: Scale the system by directly replicating application processes horizontally.
  • Y-axis: Extend the system by breaking out functionality.
  • Z-axis: Extend the system based on user information.

As opposed to Kafka:

  • X-axis: Use a reliable replica mechanism
  • Y-axis: Different functional breakdowns — Topic
  • Z-axis: Data consumed by different users – partition

The concept of partitioning was introduced a long time ago, for example:

  • MongoDB and Elasticsearch are called Shard shards
  • HBase is called Region
  • Cassandra called vnode

2. Zone policies

Kafka’s partitioning strategy is generally polling, random, and by key. Of course, we can also customize partitioning policies.

2.1 Polling Policy

Order allocation. For example, if there are three partitions under a topic, the message allocation format is as follows:

Kafka is a polling policy by default. The polling strategy has excellent load balancing performance and always ensures that messages are evenly distributed across all partitions as much as possible.

2.2 Random Strategy

Randomly place our message on any partition.

Implementation method:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
// Randomly generated according to the size of the partition
return ThreadLocalRandom.current().nextInt(partitions.size());
Copy the code

If you want to pursue uniform distribution of data, it is better to use a polling strategy.

2.3 Press the Message key to save the policy

Kafka allows you to define a message Key, or Key, for each set of messages.

The key can be a string with clear business meanings, such as customer code, department ID, and service ID. You can divide the same key into a partition.

Implementation method:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
// Divide the same hash into a partition
return Math.abs(key.hashCode()) % partitions.size();
Copy the code

2.4 Other Zoning Policies

If the above three strategies don’t support your business, then you can try to customize your own.

Public interface Partitioner extends signals, Closeable {} Specifies a different partitioning policy

public class UserDefinePartitioner implements Partitioner {
    private AtomicInteger counter = new AtomicInteger(0);

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // Get the number of partitions under this Topic
        List<PartitionInfo> partitioners = cluster.partitionsForTopic(topic);
        int numPartitioners = partitioners.size();
        if (keyBytes == null) {
            // Atomicity increases by 1
            int addIncrement = counter.getAndIncrement();
            / / modulus
            return addIncrement % numPartitioners;
        } else {
            // Hash the keyBytes and modulo them
            returnUtils.toPositive(Utils.murmur2(keyBytes)) % numPartitioners; }}@Override
    public void close(a) {
        System.out.println("close");
    }

    @Override
    public void configure(Map
       
         map)
       ,> {
        System.out.println("configure"); }}Copy the code

2.5 Case Presentation

Right now we have a set of data, and the set of data has causality, so we have to keep the order of the set of data. Otherwise, if we deal with the effect first and then the cause, there will definitely be business problems.

We use the simplest method to create a Topic, which contains only one partition, and send our causes and effects to the Topic to achieve sequential nature. This is sequential, but it loses the high throughput and load balancing capabilities that Kafka brings.

Return math.abs (key.hashcode ()) % partitions. Size (); return math.abs (key.hashcode ()) % partitions. . In this way, our producer will send cause and effect to different partitions, and different consumers will assign different partitions (consumer.assign()) to implement the corresponding business logic.

Second, producer compression algorithm

Comparession is the classic tradeoff idea of trading time for space. Specifically, it is the exchange of CPU time for disk space or network I/O transfers, with the hope that less CPU overhead will result in less disk usage or network I/O transfers.

1. How to compress

Kafka has two class message formats, version V1 and version V2.

Kafka has two message hierarchies: message sets and messages. A message set contains multiple record items, which are where the message is really encapsulated.

V1: message set, message

V2: Record Batch, record

Our V2 version extracts the common parts of the message into the outer message set, eliminating the need to store this data in every message.

In our original V1 release, CRC checks were performed for each message, but in some cases the CRC value of the message may change. For example, on the Broker side, it is possible to update timestamp fields or perform message format transformations to update CRC values, so there is no need to perform CRC checks for every message, wasting both space and time. Therefore, in version V2, CRC validation of messages is moved to the message collection layer.

The method of saving compressed messages in V1 version is to compress multiple messages and save them in the message field of the outer message. Version V2 does this by compressing the entire message set.

Performance test:

2. When to compress

Where compression can occur: on the producer side, and on the Broker side

The producer can configure props. Put (“compression. Type “, “gzip”); To enable gzip compression

It is a natural idea to enable compression on the producer side, so why do I say compression is possible on the Broker side as well?

In most cases, brokers simply save messages as they receive them from the Producer side. There are two exceptions to the rule that may cause the Broker to recompress messages.

  • The Broker specifies a different compression algorithm than the Producer side
    • This is the relationship between the producer and the Broker. Once you have set different compression. Type values on the Broker side, be careful because unexpected compression/decompression operations can occur, usually as a result of a surge in Broker CPU usage.
  • A message format conversion occurred on the Broker side. This is usually the result of V1 and V2 message version conversion. It will lose its zero-copy effect.
    • When a consumer goes to a Broker to consume a message, the version of the Broker message is different from that of the consumer. We need to switch from kernel mode to user mode, format the message, and then send it to our consumers.

3. When to decompress

Decompression usually occurs in the consumer. In general, we encapsulate the compression algorithm into the message set, and the consumer decompresses it through the compression algorithm.

The Producer side compresses, the Broker side holds, and the Consumer side decompresses.

The Broker also decompresses, and each compressed set of messages is decompressed as it is written to the Broker side in order to perform various validations on the message. However, this operation severely degrades CPU performance.

The friends of Jingdong in China have just put forward a Bugfix to the community, suggesting to remove the decompression introduced because of message verification. According to them, the CPU utilization on the Broker side decreased by at least 50 percent after the knowledge compression was removed.

4. Comparison of compression algorithms

The following table compares the results of the Benchmark compression algorithm provided by Facebook Zstandard:

  • Throughput: LZ4 > Snappy > ZSTD and GZIP
  • Compression ratio: ZSTD > LZ4 > GZIP > Snappy
  • Network bandwidth: Snappy occupies the most network bandwidth and ZSTD the least
  • CPU usage: Snappy uses more CPU during compression, while GZIP may use more CPU during decompression.

When you have sufficient CPU resources and limited environmental bandwidth, I recommend that you turn on compression. Bandwidth, after all, is scarcer than CPU. The compression algorithm is usually ZSTD


Previous recommendations:

The interviewer asked me to talk about kafka’s replica synchronization mechanism, and I started crying

Random sampling is dead. Reservoir sampling is king

Take dream as the horse, set sail, double inhuman 2021, swastika dream journey