Talk is easy,show me the code

public class KafkaProducerDemo {

  public static void main(String[] args) {

    KafkaProducer<String,String> producer = createProducer();

    / / specified topic, the key and the value
    ProducerRecord<String,String> record = new ProducerRecord<>("test1"."newkey1"."newvalue1");

    // Send it asynchronously
    producer.send(record);
    producer.close();

    System.out.println("Send completed");

  }

  public static KafkaProducer<String,String> createProducer(a) {
    Properties props = new Properties();

    // Bootstrap. servers must be set
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.239.131:9092");

    // Key. serializer must be set
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    // serializer must be set
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    //client.id
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-0");

    //retries
    props.put(ProducerConfig.RETRIES_CONFIG, 3);

    //acks
    props.put(ProducerConfig.ACKS_CONFIG, "all");

    //max.in.flight.requests.per.connection
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
    
    //linger.ms
    props.put(ProducerConfig.LINGER_MS_CONFIG, 100);

    //batch.size
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 10240);

    //buffer.memory
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 10240);

    return newKafkaProducer<>(props); }}Copy the code

The producer API is relatively simple to use. Create a ProducerRecord object (which contains the target topic and the content to be sent, as well as specifying keys and partitions) and call send to send the message. When sending the ProducerRecord object, the producer serializes the key and value object into a byte array before it can be transmitted over the network. Before diving into the source code, I first give you a source analysis diagram (in fact, should be at the end of the time to give out), so look at the diagram to look at the source code with easier

Brief description:

  1. After new KafkaProducer(), create a background thread KafkaThread(KafkaThread is the Sender,KafkaThread is the wrapper for the Sender) and scan the RecordAccumulator for messages

  2. Call kafkaproducer.send () to send a message, RecordAccumulator is stored in a Map (ConcurrentMap

    >), this message will be recorded in the same record batch (the same topic and the same partition counts as the same batch), all messages from this batch will be sent to the same topic and partition
    ,>

  3. After the RecordAccumulator is scanned by a separate thread in the background, the message is sent to the Kafka cluster (not as soon as the message is available, but if the message is ready).

  4. If the message is successfully sent (the message is successfully written to Kafka), a RecordMetaData object is returned that replaces the subject and partition information, as well as the offset recorded in the partition.

  5. If the write fails, an error is returned, and the producer tries to resend the message after receiving the error (if allowed, the message is stored in RecordAccumulator), and returns an error message after several attempts if it still fails

Source code analysis

Background thread creation

KafkaClient client = new NetworkClient(...) ; this.sender = new Sender(.,client,...) ; String ioThreadName ="kafka-producer-network-thread" + "|" + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
Copy the code

The above code is the core logic for constructing KafkaProducer. It constructs a KafkaClient that communicates with the broker, constructs a Sender, and starts an asynchronous thread named KafkaProducer. Kafka – producer – network – thread | ${clientId}. If you are in the specified when creating producer client. The id of the value of myclient, then thread name is kafka – producer – network – thread | myclient

Sending a message (cached message)

KafkaProducer<String,String> producer = createProducer();

/ / specified topic, the key and the value
ProducerRecord<String,String> record = new ProducerRecord<>("test1"."newkey1"."newvalue1");

// Send asynchronously, you can set the callback function
producer.send(record);
// Synchronize sending
//producer.send(record).get();
Copy the code

There are two ways to send messages: synchronous sending and asynchronous sending. Synchronous sending is usually not used because it takes too much time. When using asynchronous sending, you can specify a callback function to notify the producer when the message is sent (successful or failed).

Sending a message is actually a message cache, the core code is as follows:

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, 
  serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs);

Copy the code

RecordAccumulator ConcurrentMap

> will put the data of the same topic and the same Partition into a Deque(two-way queue). This is also the meaning of the message sent to the same topic and Partition from the same record batch we mentioned earlier. The core source code for the append() method is as follows:

// From batchs(ConcurrentMap
      
       >)
      ,>
// Get the corresponding queue according to the topic partition, if there is no new ArrayDeque<> return
Deque<ProducerBatch> dq = getOrCreateDeque(tp);

// Calculate the space occupied by the same record batch. BatchSize is determined by batch.size
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(
	maxUsableMagic, compression, key, value, headers));

// Allocate buffer to the same topic,partition, if the memory of the same record batch is insufficient,
// Then maxTimeToBlock(the max.block.ms parameter) will be blocked for so long
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
  // Create the MemoryRecordBuilder and initialize the appendStream(DataOutputStream) property with buffer
  MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
  ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());

  AppendStream (DataOutputStream) in the MemoryRecordsBuilder writes key and value to appendStream(DataOutputStream)
  batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());

  // Put the message that needs to be sent into the queue
  dq.addLast(batch);
}

Copy the code

Send a message to Kafka

Now that the message is stored in RecordAccumulator, let’s see how to send the message. If KafkaProducer is created, an asynchronous thread is started to fetch messages from RecordAccumulator and send them to Kafka. This is sender.java, which implements the Runnable interface and runs in the background The message is sent to the appropriate node until KafkaProducer is shut down

/**
* The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata
* requests to renew its view of the cluster and then sends produce requests to the appropriate nodes.
*/
public class Sender implements Runnable {

  public void run(a) {

    // Run until kafkaproducer.close () is called
    while (running) {
       run(time.milliseconds());
    }
    
    // Start processing the logic after KafkaProducer is shut down
    log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

    // There may still be requests and data in accumulator when it is not forced to close
    while(! forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) {
       run(time.milliseconds());
    }
    if (forceClose) {
        1. The author uses the metaphor "author closed" to imply that the author does not use the metaphor. // If the author does this by force, the author unsends the message and throws an exception.
        this.accumulator.abortIncompleteBatches(); }... }}Copy the code

KafkaProducer has two shutdown methods: close() and close(long timeout,TimeUnit timUnit). The timeout parameter means the maximum time to wait for the producer to complete any pending request. MAX_VALUE milliseconds. If timeout=0, close the Sender(running=false).

In the run(long) method, we skip transactionManager and see the main flow of sending messages as follows:

// Move record batches to the production request list for each node
long pollTimeout = sendProducerData(now);

// Polling for message sending
client.poll(pollTimeout, now);

Copy the code

First look at the sendProducerData() method, whose core logic is in the sendProduceRequest() method (in sender.java)

for (ProducerBatch batch : batches) {
    TopicPartition tp = batch.topicPartition;

    // Convert the ProducerBatch MemoryRecordsBuilder to MemoryRecords.
    MemoryRecords records = batch.records();
    produceRecordsByPartition.put(tp, records);
}

ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
        produceRecordsByPartition, transactionalId);

// A callback when the message is sent
RequestCompletionHandler callback = new RequestCompletionHandler() {
    public void onComplete(ClientResponse response) {
        // Process the response messagehandleProduceResponse(response, recordsByPartition, time.milliseconds()); }};// Construct ClientRequest based on the parameters, in which case the message to be sent is in requestBuilderClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks ! =0,
        requestTimeoutMs, callback);

// Convert clientRequest to Send object (send-.java, which contains buffer to Send data),
// Set this object to KafkaChannel, bearing in mind that no data has been sent yet
client.send(clientRequest, now);
Copy the code

The above client.send() method eventually locates networkClient.dosend (), which is used to set corresponding send objects for all requests, no matter the requests from producer to send messages or obtain metadata. The supported requests are defined in Apikeys.java, where you can see the request and response data structure for each request.

PollSelectionKeys () : selectionKeys (); selectionKeys () : selectionKeys ();

/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
if (channel.ready() && key.isWritable()) {
  // The write method of Java8 GatheringByteChannel is actually called
  channel.write();
}
Copy the code

The sending process is analyzed. This is a perfect situation, but there will always be a sending failure (the message is too large or there is no leader available). So where will the sending failure be resent? Remember the callback function above? Yes, is set in the callback function here, first look at the next callback function source

private void handleProduceResponse(ClientResponse response, Map<TopicPartition, ProducerBatch> batches, long now) {
  RequestHeader requestHeader = response.requestHeader();

  if (response.wasDisconnected()) {
    Errors.NETWORK_EXCEPTION response is constructed if the network is disconnected
    for (ProducerBatch batch : batches.values())
        completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, now, 0L);

  } else if(response.versionMismatch() ! =null) {

   // If the version does not match, the errors. UNSUPPORTED_VERSION response is constructed
    for (ProducerBatch batch : batches.values())
        completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.UNSUPPORTED_VERSION), correlationId, now, 0L);

  } else {
    
    if (response.hasResponse()) {
        // Return normal response if there is response. }}else {

        // If acks=0, then construct the errors. NONE response, because in this case only send the response without the result
        for (ProducerBatch batch : batches.values()) {
            completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NONE), correlationId, now, 0L); }}}}Copy the code

In the completeBatch method, we focus on the logical processing of failures. The core source code is as follows:

private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
                           long now, long throttleUntilTimeMs) {
  Errors error = response.error;

  // If the sent message is too large, it needs to be split again
  if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 &&
        (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) {

    this.accumulator.splitAndReenqueue(batch);
    this.accumulator.deallocate(batch);
    this.sensors.recordBatchSplit();

  } else if(error ! = Errors.NONE) {// An error occurred, if retry can be performed (retry times not reaching the limit and generating an exception is RetriableException)
    if (canRetry(batch, response)) {
        if (transactionManager == null) {
            Deque.addfirst (batch) = deque.addfirst (batch)reenqueueBatch(batch, now); }}}Copy the code

The process of sending messages has been analyzed. Now it will be clearer if you look back at the flow chart.

More information about the Kafka protocol can be found at this link

Partitioning algorithm

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
    // If the key is null, the Round Robin algorithm is used
    int nextValue = nextValue(topic);
    List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
    if (availablePartitions.size() > 0) {
        int part = Utils.toPositive(nextValue) % availablePartitions.size();
        return availablePartitions.get(part).partition();
    } else {
        // no partitions are available, give a non-available partition
        returnUtils.toPositive(nextValue) % numPartitions; }}else {
    // Hash according to key
    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
Copy the code

There are two cases for partitioning algorithms in Kafka

  1. If the key value is null and the default partition is used, record keys are randomly sent to the available partitions within the topic. The partitioner uses a Round Robin algorithm to evenly distribute key messages across partitions.
  2. If the key is not empty and the default partition is used, Kafka hashes the key (using Kafka’s own hash algorithm, the hash value does not change even if the Java version is upgraded), and then maps the message to the specific partition based on the hash value. The same key is always mapped to the same partition (which is not guaranteed if the number of partitions changes), and the mapping uses all partitions of the subject, not just the available partitions, so an error occurs if the data partition is not available for writing, which is rare.

If you want to implement custom partitioning, just implement the Partitioner interface.

Configuration parameters for the producer

After analyzing the KafkaProducer source code, we can see that many parameters are used throughout the message sending process. Here are some configuration parameters used in KafkaProducer.

  1. The acks acks parameter specifies how many partition replicas must receive the message before producer considers the message write to be successful. There are three options

    • Acks = 0, producers do not need to wait for the server response, that is to say, if there is problem, which lead to the server did not receive message, the producer has no way of knowing, the message is lost, because do not need to wait for a response at the time, so you can take the maximum speed of network can support sending messages, so as to achieve high throughput.

    • Acks =1, all it takes is for the cluster leader to receive the message and for the producer to receive a successful response from the server. If the message fails to reach the leader, the producer receives an error response, at which point the producer resends the message. However, if the node that does not receive the message is called the leader, the message will still be lost.

    • Acks =all: The producer will receive a successful response from the server only when all the participating nodes have received the message, which is the safest but has a high latency.

  2. buffer.memory

Set the size of the producer memory buffer so that if the application sends messages faster than the producer can send them to the server, the producer will run out of space and the send() method will either block or throw an exception. Depending on how you set max.block.ms, means that an exception can be blocked for a period of time before being thrown.

  1. retries

The error received when sending a message to the server may be a temporary error (for example, the leader cannot be found), in which case the number of times the producer resends the message is determined by this parameter. Note: At this point you decide whether to retry based on the number of retries and whether it is a RetriableException.

  1. batch.size

When multiple messages need to be sent to the same partition, the producer will place them in the same batch (Deque). This parameter specifies the amount of memory a batch can use, in bytes. When the batch is filled, all messages in the batch will be sent. However, the producer does not necessarily wait for a batch to be filled before sending it; it is possible to send a half-full batch or even a batch containing only one message.

  1. linger.ms

Specifies how long a producer waits for more messages to join a batch before sending it. KafkaProducer sends batches when they fill up or when linger.ms reaches its maximum. Setting linger. Ms to a value greater than zero makes the producer wait for a while before sending the batch, allowing more messages to be added to the batch, although this increases latency and improves throughput at the time.

  1. max.block.ms

Specifies how long the producer will block when the send() or partitionsFor() method is called to retrieve metadata. These methods block when the producer’s send buffer is full, or when there is no metadata available. When the blocking time reaches max-block. ms, a new TimeoutException(“Failed to allocate memory within the configured Max blocking time “+ is thrown maxTimeToBlockMs + ” ms.”);

  1. client.id

Arbitrary string that is used to identify the source, our background threads will according to it to give a name, the name of the thread is kafka – producer – network – thread | {client. Id}

  1. max.in.flight.requests.per.connection

This parameter specifies how many messages a producer can send before receiving a response from the server. The higher its value, the more memory it consumes, but also improves throughput. Setting it to 1 ensures that messages are written to the server in the order they were sent, even if retries occur.

  1. The timeout. Ms, request. A timeout. Ms and metadata. The fetch. The timeout. Ms

Request. A timeout. Ms specifies the producers in waiting for the server returns a response time while sending data, metadata. The fetch. The timeout. Ms specifies the producers in access to metadata (such as the target partition leader) wait for the server returns a response time. If waiting for a response times out, the producer either tries sending the data again or returns an error. Timeout. ms specifies how long the broker should wait for a synchronous copy to return a message acknowledgement, matching the configuration of asks – if no acknowledgement is received from a synchronous copy within a specified time, then the broker will return an error.

  1. max.request.size

This parameter is used to control the size of requests sent by producers. The broker has its own limit on the maximum number of messages that can be received (message.max.bytes). Therefore, it is best to match the configurations of the two sides to prevent messages sent by producers from being rejected by the Broker.

  1. . The receive buffer, bytes and the send buffer. The bytes

These parameters specify the buffer size for TCP sockets to receive and send packets (to communicate with the broker or through the socket), respectively. If they are set to -1, the default values of the operating system are used. If the producer or consumer is in a different data center than the broker, these values can be appropriately increased because networks across data centers tend to have higher latency and lower bandwidth.