(1) The principle of producers

Underneath Kafka there is a process for sending data from producers to consumers. After the producer calls the send method to send the message, it passes through a layer of interceptors and then into the serializer. Serializers are primarily used to serialize the Key and Value of a message. Next, go into the divider to select the partition for the message.

After these steps are complete, the message is placed into a buffer queue called RecordAccumulator, which defaults to 32MB. When either of the following conditions is met, the message is sent by the sender thread.

Condition 1: the total number of messages reaches batch.size (16kb by default).

Condition 2: The waiting time reaches linger. Ms. The default value is 0 ms.

So by default, since the wait time is 0 milliseconds, a message will be sent whenever it comes in.

The Sender thread first reads data from the Sender and creates sent requests. For each Broker in a Kafka cluster, a queue of InFlightRequests requests reside in NetWorkClient. By default, five requests are cached in each InFlightRequests request queue. These requests are then sent to the Kafka cluster by Selector.

When a request is sent to the Kafka cluster, the Kafka cluster returns the corresponding acks. The producer can choose to process the acks information based on the specific situation. For example, whether you need to wait for a response before sending a message, or continue sending the message regardless of success or failure.

(2) Instance of message sending

Before sending messages using Kafka, you first need to introduce dependencies

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.0.0</version>
</dependency>
Copy the code

2.1 Simple Asynchronous Sending

The simplest way to send a message is to use Properties to configure kafka’s connection and serialization of keys and values, and then call send to send the message to the specified topic.

public class Producer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.78.128:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        kafkaProducer.send(new ProducerRecord<>("testTopic"."hello")); kafkaProducer.close(); }}Copy the code

2.2 Asynchronous sending of callback

It is not possible to get the message sent in this way, so we can use send with a callback function:

public class ProducerCallback {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.78.128:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        kafkaProducer.send(new ProducerRecord<>("testTopic"."hello"), new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                System.out.println(recordMetadata.topic()+""+recordMetadata.partition()); }}); kafkaProducer.close(); }}Copy the code

The callback function retrieves a list of sent data, such as topic and partition.

2.3 Synchronous Sending

Message queues are typically asynchronous, but kafka can also be used if a project needs to send messages synchronously. Get (); send ();

public static void main(String[] args) throws ExecutionException, InterruptedException {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.78.128:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    kafkaProducer.send(new ProducerRecord<>("testTopic"."hello")).get();
    kafkaProducer.close();
}
Copy the code

(3) Partition policy when message is sent

3.1 Kafka partitioning policy

Kafka uses partitions to queue messages in large volumes of data. When a Kafka cluster has multiple partitions, sending a message can specify that a message is sent to a partition.

Observe several inputs to the ProducerRecord method:

When a partition is specified, messages are sent to the specified partition.

If no partition is specified but a Key exists, the hash value of the Key is mod to the number of partitions to obtain the specified partition.

If only values exist, Kafka uses Sticky partitions and randomly selects a partition to use. When the number of partitions is full or the linger. Ms time expires, Kafka randomly selects a partition to use.

3.2 Custom Partitions

There are times when you might want to implement custom partitioning rules, such as sending a key to a specified partition when it is a certain value.

Create a new class that implements the Partitioner interface and defines its own logic in the partition method, which is sent to partition 0 if the key is aaa, and to partition 1 otherwise.

public class MyPartition implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if (key.toString().equals("aaa")) {return 0;
        }else {
            return 1; }}@Override
    public void close(a) {}@Override
    public void configure(Map
       
         map)
       ,> {}}Copy the code

Next, configure the divider

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.javayz.kafka.producer.MyPartition");
Copy the code

(4) Optimization scheme of producers

4.1 Improving the Send Throughput

Batch. size = a certain order of magnitude or Linger. Ms = wait time.

By default, batch.size= 16K, lingering.ms =0ms, which means that each incoming message is directly sent to the Kafka cluster, so the throughput is not high. As a result, you can slightly increase the lingering.ms wait time and wait for a few messages to come in before sending them all together to the Kafka cluster, which improves throughput.

In addition, you can also set the compression method of messages or adjust the size of RecordAccumulator to improve throughput.

// Set batch size
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
/ / set linger. Ms
properties.put(ProducerConfig.LINGER_MS_CONFIG,10);
// Set the compression mode. The options are gzip, snappy, LZ4, or ZSTD
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
// Set the buffer size
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
Copy the code

4.2 Improving Data Reliability

After data is sent to a Kafka cluster, the Kafka cluster responds in three ways:

If acks=0, the data sent by the producer does not matter whether it succeeds or not.

Acks =1: Kafka continues to send data only after the Leader responds.

Acks =-1: sends messages only after the Leader and all nodes in the ISR queue respond.

An ISR queue is a collection of followers and Leader nodes that are synchronized with the Leader node. For example, if the Leader node is 0 and the other two followers nodes are 1 and 2, the ISR queue is 0, 1, 2. If a Follow node does not respond to the Leader within a specified period of time, the node is kicked from the ISR queue.

Generally, three reply modes are selected based on application scenarios. If the data requires strong reliability, acks=-1 is used. If the data does not require reliability, 0 and 1 are selected.

// Set ack to 0, 1, -1
properties.put(ProducerConfig.ACKS_CONFIG,"0");
Copy the code

4.3 Transaction management of messages

In MySQL, transactions are sometimes used to ensure that data inserts succeed at the same time or fail at all.

The sending of messages in Kafka also supports transactions. To start a transaction in Kafka, you first need to specify the transaction ID.

properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction_01");
Copy the code

Transaction messages are sent through several transaction apis

kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
try {
    kafkaProducer.send(new ProducerRecord<>("testTopic"."aaab"."hello"), new Callback() {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            System.out.println(recordMetadata.topic()+""+recordMetadata.partition()); }}); kafkaProducer.commitTransaction(); }catch (Exception e){
    kafkaProducer.abortTransaction();
}
Copy the code