Welcome to pay attention to my wechat public number [old Week chat architecture], The principle of Java backend mainstream technology stack, source code analysis, architecture and a variety of Internet high concurrency, high performance, high availability solutions.

One, foreword

The first few articles we talked about Kafka infrastructure and build, from this start we will come to the source analysis of a wave. The Client side is implemented in Java and the Server side is implemented in Scala. When using Kafka, the Client side is the first part that the user touches. Therefore, we start with the Client side. We’ll start with the Producer side. Today we’ll look at the source code of the Producer side.

Second, being used by producers

Let’s start with a code that shows how KafkaProducer can be used. In the following example, we use KafkaProducer to send messages to Kafka. In the example program, the configuration used by KafkaProduce is first written to Properties, and the specific meaning of each configuration is explained in a comment. After that, the KafkaProducer object is constructed with the Properties object as the parameter. Finally, the send method is used to complete the sending. The code includes synchronous sending and asynchronous sending.

Kafka provides a very simple and convenient API for users to use. There are only two steps:

  • Initialize the KafkaProducer instance
  • Call the SEND interface to send data

This article mainly focuses on the initialization of KafkaProducer instance and how to implement send interface to send data.

3. Instantiate KafkaProducer

After understanding the basic usage of KafkaProducer, let’s take a closer look at the core logic of the method:

public KafkaProducer(Properties properties) {
    this(Utils.propsToMap(properties), (Serializer)null, (Serializer)null, (ProducerMetadata)null, (KafkaClient)null, (ProducerInterceptors)null, Time.SYSTEM);
}
Copy the code

4. Message sending process

Users are directly using producer.send() to send data. Let’s take a look at the implementation of the send() interface

// Asynchronously send data to a topic
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
    return this.send(record, (Callback)null);
}

// Send data asynchronously to topic, invoking the callback function when confirmation is sent
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
    return this.doSend(interceptedRecord, callback);
}
Copy the code

The final implementation of data sending calls the Producer’s doSend() interface.

4.1 the interceptor

The ProducerInterceptors onSend method traverses the interceptor onSend method. The interceptor’s purpose is to process data. Kafka itself does not provide a default interceptor implementation. If you want to use the interceptor functionality, you must implement the interface yourself.

4.1.1 Interceptor code

4.1.2 Interceptor core logic The ProducerInterceptor interface includes three methods:

  • onSend(ProducerRecord<K, V> var1)This method is wrapped in the kafKaproducer.send method, which runs on the user’s main thread. Make sure this method is called before the message is serialized to compute the partition. The user can do anything with the message in this method, but it is best to make sure that you do not change the topic and partition to which the message belongs, otherwise it will affect the calculation of the target partition.
  • onAcknowledgement(RecordMetadata var1, Exception var2)This method is called before the message is answered or if the message fails to be sent, usually before the Producer callback logic is triggered. The onAcknowledgement runs in the PRODUCER’s I/O thread, so don’t add heavy logic to this method, otherwise it will slow down the message sending efficiency of the producer.
  • close(): Closes the interceptor, which is used to clear resources.

Interceptors can be run in multiple threads, so users need to ensure thread-safety when implementing them. In addition, if multiple interceptors are specified, producer calls them in the specified order, and only captures the exceptions that each interceptor may throw in an error log instead of passing them up.

4.2 Realization of doSend by Producer

Here is an implementation of doSend() :

In the implementation of doSend() method, the sending of a Record data is mainly divided into the following five steps:

  • Verify that the metadata of the topic to which data is to be sent is available (if the partition leader exists, it is available; if the permission is enabled, the client has the corresponding permission). If the metadata of the topic is not available, You need to get the corresponding metadata;
  • Serialize the key and value of record;
  • Get the partition to which the record is to be sent (can be specified or calculated according to the algorithm);
  • Append record data to Accumulator. The data will be cached first.
  • If the corresponding RecordBatch has reached batch.size after appending data (or the batch does not have enough space left to add the next Record), the sender thread is woken up to send data.

The data sending process can be summarized as the above five points. The specific implementation of these parts will be analyzed in detail below.

5. Message sending process

5.1 Obtaining Metadata Information about a Topic

Producer uses the waitOnMetadata() method to retrieve metadata information for the corresponding topic, which I will talk about in the next article.

5.2 Serialization of keys and Values

The Producer side serializes the key and value of record, and the Consumer side performs corresponding deserialization. The serialization and deserialization algorithms provided by Kafka are as follows:Of course, we can also customize the implementation of serialization, but in general, the methods provided inside Kafka are sufficient.

5.3 Obtaining the Partition to which the Record is to be sent

Obtain the partition value, which is divided into the following three situations:

  • When partition is specified, the specified value is directly used as the Partiton value;
  • If the partition value is not specified but there is a key, mod the hash value of the key and the number of partitions of the topic to obtain the partition value.
  • In the case that there is neither a partition value nor a key value, the first call randomly generates an integer (incremented on this integer with each subsequent call) and modulates this value with the total number of partitions available for topic to obtain the partition value. It’s called the round Robin algorithm.

The concrete implementation is as follows:

// If there is a partition value in the record, return it, if there is none, invoke the partition method of the partitioner's class to calculate (kafkaproducer.class).
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    Integer partition = record.partition();
    returnpartition ! =null ? partition : this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
Copy the code

Producer default partitioner is org. Apache. Kafka. Clients. Producer. The internals. DefaultPartitioner, users can customize partition strategy, The following is the implementation of the default partition policy:

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    return this.partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());
}

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {
    return keyBytes == null ? this.stickyPartitionCache.partition(topic, cluster) : Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
Copy the code

The core of the default algorithm above is the sticky partition cache

5.4 Adding Record Data to RecordAccmulator

Before we talk about RecordAccumulator, we will look at this picture first, so that we can have a general view of the whole sending process.

The RecordAccmulator takes on the role of buffer. The default is 32 MB.

In Kafka Producer, instead of sending messages one by one to the broker, multiple messages form a ProducerBatch, which is then sent by the Sender at once. It’s a magnitude. The default is 16 KB, which can be optimized as needed.

In RecordAccumulator, the most important parameter is:

private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
Copy the code

It is a ConcurrentMap, and the key is the TopicPartition class, representing a partition for a topic. Value is a two-ended queue containing ProducerBatch. Wait for the Sender thread to send to the broker. Let’s draw a picture:

Do you have any questions about the code above? Why is the code that allocates memory not allocated in synchronized blocks? Synchronized blocks must tryAppend as a result.

Other threads may have already created RecordBatch, causing extra memory requests.

What’s the problem with placing allocated memory in a synchronized block?

If the memory cannot be allocated, the thread will wait. If the memory is placed in the synchronization block, the lock of the Deque queue will not be released, and other threads will not be able to perform thread-safe synchronization operations on the Deque queue.

The tryAppend() method is a little easier.

The above codes are shown in the diagram:

5.5 Waking up the Sender Thread To send the RecordBatch

When the Record is successfully written, if the RecordBatch is found to be ready to send (usually when there are multiple Batches in the queue, the first batch to be added must be ready to send), then the sender thread wakes up and sends the RecordBatch.

RecordBatch is handled by the sender thread in the run() method, which is implemented as follows:

For the method of the core is the run () method of org. Apache. Kafka. Clients. Producer. The internals. Sender# sendProducerData

Where pollTimeout means the longest to block until at least one channel is ready for your registered event. A return of 0 means the train is off.

We continue with the org. Apache. Kafka. Clients. Producer. The internals. RecordAccumulator# readyFinally, look at this method inside org. Apache. Kafka. Clients. Producer. The internals. RecordAccumulator# drain, tapping from the accumulator buffer to send data, Max. request.size Specifies the maximum size of data to be sent at one time.

Six, summarized

Finally, to give you an idea of Kafka Producer’s big-picture architecture, take a look at the following chart:

Brief description:

  • After new KafkaProducer(), create a background thread KafkaThread (KafkaThread is the Sender, KafkaThread is the wrapper for the Sender) and scan the RecordAccumulator for messages.
  • Kafkaproducer.send () is an accumulator for RecordAccumulator. This is an accumulator for RecordAccumulator (ConcurrentMap

    ) This message will be recorded in the same record batch (the same topic and the same partition count as the same batch), and all messages from this batch will be sent to the same topic and partition.
    ,>
  • After the RecordAccumulator is scanned by a separate thread in the background, the message is sent to the Kafka cluster (not as soon as the message is available, but if the message is ready).
  • If the message is successfully sent (the message is successfully written to Kafka), a RecordMetaData object is returned containing the subject and partition information, as well as the offset recorded in the partition.
  • If the write fails, an error is returned, and the producer tries to resend the message after receiving the error (if allowed, the message is stored in RecordAccumulator), and returns an error message after several attempts if it still fails.

In this article, Kafka Producer’s source code is analyzed. In the next article, we will introduce the content of Metadata and the updating mechanism of metadata on the Producer side. Stay tuned ~