This is the 19th day of my participation in the August More Text Challenge

preface

In this article, we will see how producer works by introducing and using producer.

1. Configure necessary parameters

When creating a producer, there are many configurations to fill in, among which three parameters are mandatory:

  • Bootstrap. servers: Specifies the list of broker addresses required to connect to a Kafka cluster. You can set one or more
  • Key. serializer and Value. serializer: Messages received in the broker must exist as byte arrays. Therefore, messages must be serialized into byte arrays before being sent

Other parameters:

  • properties.put(“client.id”,”id”); // Set the client ID. If it is not specified, it will be automatically generated, such as producer1, producer2…

  • Properties. Put (“acks”,”all”) : Specifies how many replicas in the partition must receive this message

    • The producer needs the leader to confirm the number of replies received before the request completes. This configuration controls the durability of the sent message and supports the following configurations:

      • Acks =0 If set to 0, the producer will not wait for any message confirmation. The message is immediately added to the socket buffer and considered for sending. There is no guarantee that the message will be received by the server in this case. And the retry mechanism does not work (because the client does not know if the fault occurred). The offset returned by each message is always set to -1.
      • Acks =1, which means that the leader writes a message to the local log and responds immediately, without waiting for all followers to reply. In this case, if the leader fails immediately after the response message but before the follower is replicated, the message will be lost.
      • Acks =all means that the leader will wait for all replicas to be synchronized to reply to the message. This configuration guarantees that messages will not be lost (as long as there is at least one synchronized copy). This is the strongest guarantee of availability. Equivalent to acks=-1.
  • properties.put(“retries”,1); // Number of retries
  • properties.put(“batch.size”,16384); // Batch size
  • properties.put(“linger.ms”,1); // Wait time
  • properties.put(“buffer.memory”, 33554432); // Buffer size

Set producer client parameters

parameter The default value Recommended value instructions
acks 1 High reliability: all high throughput: 1 Number of received confirmation signals from the Server: indicates the number of such confirmation signals procuder needs to receive, and the message is successfully sent. The acks parameter represents the availability of data backups. Common options: acks=0: indicates that producer does not need to wait for any confirmation of receiving information, and the copy is immediately added to the socket buffer and considered to have been sent. There is no guarantee that the server has successfully received the data in this case, and the retry configuration will not take effect (because the client does not know if it failed). The offset returned will always be set to -1. acks=1: This means waiting at least until the leader has successfully written data to the local log, but not until all followers have successfully written. If the followers do not successfully back up the data and the leader is unable to provide the service, the messages will be lost. acks=all: This means that the leader needs to wait for all backups to be successfully written to the log. If only one backup survives, data will not be lost.
retries 0 Adjust according to actual business Number of client message retries. If the value is greater than 0, the client resends the data after the data fails to be sent. Note that these retries are no different from the retries when the client receives a send error. Allow retries to potentially change the order of data. If both records are sent to the same partition, the first message fails and the second message succeeds, and the second message appears before the first.
request.timeout.ms 30000 Adjust according to actual business Set the maximum waiting time for a request. If this time is exceeded, a Timeout exception will be thrown. If the timeout period is set to a larger value, such as 120,000 (120 seconds), the situation of sending failures can be reduced in high-concurrency scenarios.
block.on.buffer.full TRUE TRUE TRUE means that when we run out of memory, we stop receiving new message records or throw errors. By default, this is set to TRUE. However, some blocks may not be desirable, so it is better to throw an error immediately. If set to false, producer throws an exception error: Bufferhaustedexception
batch.size 16384 262144 Default maximum number of bytes for batch processing messages. Producer will attempt to batch message logging to reduce the number of requests. This will improve performance between client and server. No attempt is made to process message bytes larger than this number. The request sent to brokers will contain multiple batch processes, which will contain one request for each partition. Smaller batch processing values are less used and may reduce throughput (0 would be batch processing only). Larger batch values waste more memory space, which requires the allocation of memory for a particular batch value.
buffer.memory 33554432 67108864 The size of memory that producer can use to cache data. If data is generated faster than it is being sent to the broker, producer blocks or throws an exception, indicated by “block.on.buffer.full”. This setting is related to the total memory available to the producer, but is not a hard limit because not all memory used by the producer is used for caching. Some additional memory will be used for compression (if compression is introduced), as well as for maintenance requests.

2. Message sending process

Typically, production logic requires the following steps:

  • 1. Configure producer client parameters and create producer instances that respond
  • 2. Build the message to be sent
  • 3. Send a message
  • 4. Close the producer instance

Sending messages it can be divided into three modes:

  • 1. Fire-and-forget
  • 2. Sync
  • 3. Async

2.1 Send and forget

Public class KafkaProducerAnalysis {private Static Final String brokerList = "192.168.81.101:9092"; private static final String topic = "xiaolei2"; public static Properties initConfig(){ Properties properties = new Properties(); properties.put("bootstrap.servers",brokerList); / / the specified key serializer properties. The put (" key. The serializer ", "org.apache.kafka.com mon. Serialization. StringSerializer"); / / specify the value of the serializer properties. The put (" value. The serializer ", "org.apache.kafka.com mon. Serialization. StringSerializer"); return properties; } public static void main(String[] args) { Properties properties = initConfig(); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); for (int i = 0; i <= 5; i++) { producer.send(new ProducerRecord<>(topic,Integer.toString(i))); } producer.close(); }}Copy the code

In addition to configuration, one important object we can notice is ProducerRecord, whose instance properties are as follows and which we will focus on.

public class ProducerRecord<K, V> { private final String topic; Private final Integer partition; Private final Headers Headers; // Message header, you can add customized message private final K key; // key, which can be used to calculate partition numbers. If no partition is specified, messages with the same key will be divided into the same partition. private final V value; // value private final Long timestamp; // Message timestamp}Copy the code

2.2 Synchronous Sending

Send and forget mode: it does not care whether the message is delivered correctly, which may cause data loss. This mode has the highest performance and the lowest reliability.

The method of synchronous sending can be realized by using the returned Futrune object.

public static void main(String[] args) { Properties properties = initConfig(); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); try{ for (int i = 0; i <= 5; i++) { Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic, Integer.toString(i))); RecordMetadata recordMetadata = future.get(); System.out.println(recordMetadata.topic()+"-"+recordMetadata.partition()+"-"+recordMetadata.offset()); } producer.close(); }catch (Exception e){ e.printStackTrace(); }}Copy the code

Print effect:

xiaolei2-0-6
xiaolei2-0-7
xiaolei2-0-8
xiaolei2-0-9
xiaolei2-0-10
xiaolei2-0-11
Copy the code

Get () gets a RecordMetadata object that contains metadata information about the message, such as the subject of the current message, the partition number, the offset in the partition, and the timestamp.

If this information is not needed, producer.send(record).get() can be directly used.

The reliability of synchronous sending is high, but the performance deteriorates greatly. You need to block and wait for one message to be sent before sending the next one. Either the message is sent successfully or the message fails to be sent, and an exception occurs. Exceptions generally come in two types:

  • A retried exception
  • An exception that cannot be retried.

Repeatable exceptions can be retried multiple times by configuring the REtries parameter and processed in the outer logic if consumption fails.

Exceptions that cannot be retried, such as messages that are too large, are thrown directly.

2.3 Asynchronous Sending

Asynchronous sending calls Callback in the send () method, and Kafka calls back if it gets a response

The callback function is called asynchronously when producer receives an ACK. The method has two parameters: RecordMetadata and Exception. If Exception is null, the message is successfully sent. If Exception is not null, the message fails to be sent.

[Note] : Message sending failure will be automatically retried, we do not need to manually retry in the callback function

public class ProducerAck { public static void main(String[] args) { Properties properties = new Properties(); properties.put("bootstrap.servers","hadoop101:9092"); properties.put("acks","all"); properties.put("retries",1); Properties. Put ("batch.size",16384); // Batch size properties. Put ("linger. Ms ",1); // properties. Put ("buffer.memory", 33554432); / / / / the buffer size of the specified key serializer properties. The put (" key. The serializer ", "org.apache.kafka.com mon. Serialization. StringSerializer"); / / specify the value of the serializer properties. The put (" value. The serializer ", "org.apache.kafka.com mon. Serialization. StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); for (int i = 0; i <= 100; I ++) {producer.send(new ProducerRecord<>("xiaolei", integer.tostring (I)), new Callback() {// This method is called when the Producer receives an ACK, @override public void onCompletion(RecordMetadata RecordMetadata, Exception e) { if (e == null) { System.out.println("success->" + recordMetadata.offset()); } else { e.printStackTrace(); }}}); } producer.close(); }}Copy the code

3. Serialization

The serializer also uses the StringSerializer provided by the client. If the serializer is used, the serializer must be used to convert the object into a byte array before it can be sent to Kafka over the network. ByteArray, ByteBuffer, Bytes, Double, Integer, and Long also implement this interface.

Producers and consumers should serialize the same key value. If the consumer uses IntegerSerializer, the StringSerializer data will not be parsed.

Four, partition

4.1 Partitioning Policies

As you can see, partitioning is not required except for topics. The reason for partitioning is to facilitate horizontal scaling in the cluster and to adjust the concurrency of message production for topics.

So what are the zoning principles?

We send messages using the ProducerRecord object, which defines the principles of partitioning as follows:

  • 1. If a partition is specified, the specified value is directly used as the partition value.
  • 2. If no partition is specified but there is a key, mod the hash value of the key and the number of partitions of the topic to obtain the partition value
  • 3. If there is neither a partition value nor a key value, the system randomly uses a partition and keeps using the partition as long as possible. When the batch buffer of the partition is full or exceeds the specified time, the system randomly uses another partition.

4.2 partition editor

Kafka implements the partition by default using DefaultPartitioner to implement the Partitioner interface.

The partition method is used to calculate the partition number. We can also customize the calculation method of partition according to our own business. For example, large e-commerce enterprises have multiple warehouses and can use the name of the warehouse as key to store the partition of commodity information.

Kafka provides a default partition for Kafka partitions. Kafka also provides a custom partition for Kafka partitions.

public class DemoPartitioner implements Partitioner { private final AtomicInteger counter = new AtomicInteger(0); @Override public int partition(String topic, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic); int num = partitionInfos.size(); if(null == bytes){ return counter.getAndIncrement()%num; }else{ return Utils.toPositive(Utils.murmur2(bytes1))% num; } } @Override public void close() { } @Override public void configure(Map<String, ? > map) { } }Copy the code

You need to add a custom divider to the configuration class:

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DemoPartitioner.class);
Copy the code

5. Interceptors

Producer interceptors can be used to do some preparatory work before a message is sent, such as filtering messages that do not meet requirements according to a rule, modifying the content of the message, etc., as well as some statistical work.

Producer interceptors are implemented by customizing the ProducerInterceptor interface.

In the following example, we prefix the sent message with “pre-“.

public class ProducerInterceptorPrefix implements ProducerInterceptor { private volatile long sendSuccess = 0; private volatile long sendFailure = 0; @Override public ProducerRecord onSend(ProducerRecord producerRecord) { String modify = "pre-"+producerRecord.value(); return new ProducerRecord(producerRecord.topic(),producerRecord.partition(),producerRecord.timestamp(),producerRecord.key(),modify, producerRecord.headers()); } @Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { if(e == null){ sendSuccess++; }else{ sendFailure++; } } @Override public void close() { double result = (double)sendSuccess/(sendFailure+sendSuccess); System.out.println(" success, % : "+result*100+"%"); } @Override public void configure(Map<String, ? > map) { } }Copy the code

You also need to specify interceptors in the configuration class:

properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorPrefix.class.getName());
Copy the code

The data on consumer spending are as follows:

At the same time, interceptors can be configured to form multiple interceptor chains. Use commas as follows:

properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorPrefix.class.getName()+","+ProducerInterceptorPrefix.class.getName());
Copy the code

The command output is as follows:

6. Overall structure

The diagram above shows the overall architecture of producer client, which mainly includes core classes: KafkaProducer to produce messages, RecordAccumulator, Sender to process send requestor, and Network IO Selector.

  • 1. The entire producer client is coordinated by two threads, the master thread and the Sender thread. The messages are created by KafkaProducer in the main thread and cached in the message accumulator through possible interceptors, serializers, and dividers.
  • 2. Inside the message accumulator is a double-ended queue, which is used to cache messages and compress them, and then provide the Sender thread to send them in batches. The default cache size is 32 megabytes. If producers send messages faster than they can send them to the server, they run out of space, and KafkaProducer’s send calls block or fail.
  • After receiving the message, the Sender converts the data to

    . NodeId represents the broker Node ID, List represents the sent message data set, but it is not the final Request object, Sender is further encapsulated in

    , so that the Request Request can be sent to each Node. Request here refers to various protocols. And then finally the network I/O layer is sent by Selector.
    ,request>
    ,list>
  • 4. Selector is a non-blocking network IO class that supports multiple connections, including NetworkSend and NetworkReceive, which send network requests to Kafka cluster and receive the response from Kafka cluster, respectively.

References:

  • In-depth Understanding of Kakfa core Design and Implementation Principles