Normal send logic

  1. Configure producer client parameters and create producer instances.
  2. Build the message to be sent.
  3. Send a message.
  4. Close the producer instance.

The producer code is as follows:

< the dependency > < groupId > org. Apache. Kafka < / groupId > < artifactId > kafka - clients < / artifactId > < version > 2.3.1 < / version > </dependency>Copy the code
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ProducerStart {

    private static final String topic = "topic-demo";
    private static final String brokeList = "localhost:9092";

    public static Properties initConfig(a) {
        Properties props = new Properties();
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokeList);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");
// props.put("acks", "all");
// props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// props.put("client.id", "producer.client.id.demo");
// props.put("bootstrap.server", brokeList);
        return props;
    }

    public static void main(String[] args) {
        Properties properties = initConfig();
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "hello,Kafka!");
        try {
            producer.send(record);
        } catch(Exception e) { e.printStackTrace(); } producer.close(); }}Copy the code

Take a look at the comments on the KafkaProducer class. Generally speaking, the source code for open source projects is concise

 The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server as well as a background I/O thread that is responsible for turning these records into requests and transmitting them to the cluster. Failure to close the producer after use will leak these resources. The {@link #send(ProducerRecord) send()} method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency. The acks config controls the criteria under which requests are considered complete. The "all" setting we have specified will result in blocking on the full commit of the record, the slowest but most durable setting.Copy the code

The generator consists of a pool of buffer space that holds messages that have not yet been delivered to the server and a background I/O thread that converts these messages into requests and delivers them to the cluster. This shows that the Producer class is not really a sending class. Instead, it stores messages and waits for another thread to send them to the server.

The second sentence is that the send() method is asynchronous, and when called, it adds the message to the buffer that suspended the message from being sent and returns it immediately. This allows producers to batch individual records to improve efficiency. The Producer is not a real sender, but instead stores messages in a buffer and returns them.

In the third game, the acks configuration controls the conditions that are considered complete requests. Specifying acks=all in the sample code will block the full commit of the record, which is the slowest but most persistent setting. Check the org. Apache. Kafka. Clients. Producer. ProducerConfig# ACKS_CONFIG found that there are a ACKS_DOC ACKS explanation. This parameter is used to specify how many replicas in the partition must receive the message before the producer considers the message to have been written successfully. There are three types of values for acks (strings)

  • Acks = 0. Then the producer will not wait for any confirmation from the server at all. The record is immediately added to the socket buffer and is considered sent. In this case, there is no guarantee that the server has received the record, and the retry configuration will not take effect (because the client will usually not be aware of any failures). The offset returned to each record will always be set to -1.
  • Acks = 1. This means that the leader writes the record to its local log and responds successfully without waiting for confirmation from other replicas. In this case, if the leader fails after validating the record, but before the follow copy is copied, the record is lost.
  • Acks = -1 or acks = all. This means that the leader will wait for the complete in-sync Replicas to acknowledge the record. This guarantees that records will not be lost as long as there is at least one synchronous copy. Acks =all for maximum reliability

Sending of messages

There are three main modes for sending messages: fire-and-forget, sync, and async. In the sample code, the message is sent to Kafka regardless of whether the message arrived correctly. In most cases, this is fine, but sometimes it can cause messages to be lost. This mode has the highest performance but the lowest reliability. KakfaProducer’s send() method is of type Future. Send has two overloaded methods, defined as follows:

 public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        return send(record, null);
    }
 public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
Copy the code

Sync, which can be implemented using the returned Future object as shown in the following example:

try {
    producer.send(record).get();
} catch (Exception e) {
    e.printStackTrace();
}
Copy the code

In fact, the SEND method itself is asynchronous. The get() method is called to block and wait for kafka’s response until the message is sent successfully or an exception occurs. If an exception occurs, it needs to be caught and handled by the outer logic. There are generally two types of exceptions that occur in KakfaProducer: replayable and non-replayable. For retried exceptions, if the retries parameter is configured, the exception will not be thrown as long as it recovers within a specified number of retries. The retries parameter is 0 by default and can be configured as follows:

props.put(ProducerConfig.RETRIES_CONFIG,3);
// This indicates that three retries have been configured. If there is no recovery after three retries, exceptions will still be thrown and the outer logic will handle these exceptions
Copy the code

Synchronous sending has low performance and requires blocking to wait for a message to be sent before sending the next one.

Asynchronous sending mode

The send() method specifies a Callback function that Kafka calls when it returns a response. Kafka returns a Callback, either successfully or with an exception, as follows:

producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if(exception ! =null) {
                        // If the exception is not empty, sending fails
                        // In actual projects, it can be handled according to specific business
                        exception.printStackTrace();
                    } else {
                        // The message is successfully sent
                        System.out.println(metadata.topic() + "-" + metadata.partition() + ":"+ metadata.offset()); }}});Copy the code

The two parameters in the onCompletion() method are mutually exclusive. It’s mutually exclusive because of this. I’ll talk about that next time

org.apache.kafka.clients.producer.internals.ProducerBatch#completeFutureAndFireCallbacks
    private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
        // Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call
        produceFuture.set(baseOffset, logAppendTime, exception);

        // execute callbacks
        for(Thunk Thunk: thunks) {try {// Check whether the exception is emptyif (exception == null) {
                    RecordMetadata metadata = thunk.future.value();
                    if(thunk.callback ! = null) thunk.callback.onCompletion(metadata, null); }else {
                    if(thunk.callback ! = null) thunk.callback.onCompletion(null, exception); } } catch (Exception e) { log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
            }
        }

        produceFuture.done();
    }

Copy the code

When the message is successfully sent, metadata is not null and exception is null. Conversely, metadata is null and exception is not null.

Next article Kafka sender source parsing

If there is any doubt or write a bad place, you can comment or contact me via email [email protected]

Related references: In-depth Understanding of Kafka’s Core Design and Practice