A normal producer logic requires the following steps:

  • Configure producer client parameters and create producer instances
  • Build the message to be sent
  • Send a message
  • Closing producer instances

The serializer

When sending messages, producers need to use Serializer to convert objects into byte arrays before sending them to Kafka over the network. Similarly, consumers need to use Deserializer to convert byte arrays received from Kafka into corresponding objects.

There needs to be a one-to-one correspondence between the serializers used by producers and the deserializers used by consumers.

The Kafka client provides a variety of serializers, as well as support for custom implementation serializers (optionally implemented using generic serialization tools such as Avro, JSON, Thrift, ProtoBuf, Protostuff, etc.).

Partition is

When a message is sent to Broke via send(), it may need to go through a series of Interceptor, Serializer, and Partitioner actions before it can actually be sent to Kafka. Interceptors are generally not required, whereas serializers are required.

After the message passes through the serializer, it needs to determine the partition to which it is sent. If a specific partition field is specified in the message ProducerRecord, then the function of the partition is not needed, because partition represents the partition number to which it is sent.

If the message does not specify a partition number, you need to rely on the partition divider to calculate the partition value based on the key field. The purpose of a divider is to partition messages.

The default partition logic provided in Kafka is:

  • If the key is not null, the key is hashed (using the MurmurHash2 algorithm), and the partition number is calculated based on the hash value. Messages with the same key are sent to the same partition
  • If the key is null, the message will be sent in a rotation to the various availability zones within the topic

The mapping between keys and partitions can remain the same without changing the number of subject partitions, and becomes difficult to guarantee once the number of subject partitions changes (adding partitions).

In addition to the divider provided by default in Kafka, you can use a custom divider by implementing the Same Partitioner interface as DefaultPartitioner.

After the custom partition is implemented, you need to specify the custom partition through the configuration parameter display.

The interceptor

There are two types of interceptors for Kafka: producer interceptors and consumer interceptors.

Producer interceptors can either do some preparatory work before a message is sent, such as filtering unqualified messages according to a rule or modifying the content of the message, or they can do some custom requirements, such as statistical work, before sending the callback logic.

Implementation: ProducerInterceptor interface, which provides the following three methods

  • onSend
  • onAcknowledgement
  • close

KafkaProducer calls the producer interceptor’s onSend() method to customize the message before serializing and partitioning the message. (Generally speaking, it is best not to modify the ProducerRecord topic key partition or other information, otherwise it may affect the partition calculation and broker side log compression function)

KafkaProducer calls the onAcknowledgement() method of the producer interceptor before a message is answered and when a message fails to be sent, prior to user-specified Callback. This method runs in a Producer’s I/O thread, so the simpler the logic, the better. Otherwise, messages can be sent faster.

The close() method is primarily used to perform some cleanup of resources when the interceptor is shut down.

Exceptions thrown in these three methods are caught and logged, and are not passed up.

The overall architecture

The same producer client in Kafka is run in coordination with two threads: the main thread and the Sender thread.

The message is created by KafkaProducer in the main thread and then cached in the message accumulator RecordAccumulator through possible interceptors, serializers, and partitions. The Sender thread is responsible for getting the message from the message accumulator and sending it to Kafka.

The message accumulator is mainly used to cache messages so that the Sender thread can send them in bulk, thereby reducing the resource consumption of network transfers and improving performance. The size of the message accumulator cache can be specified by configuring buffer.memory (33554432 B — 32MB by default). If the sending speed is faster than the sending speed to the server, the producer space will run out, and the send() method call will either block or throw an exception, depending on the configuration of max.block.ms, which defaults to 60,000-60s.

The messages sent from the main thread will be appended to a double-ended queue (Deque) of RecordAccumulator. Inside RecordAccumulator, a double-ended queue is maintained for each partition, and the content in the queue is ProducerBatch. Deque < ProducerBatch >. When a message is written to the cache, it is appended to the end of a double-ended queue. When the Sender reads the message, it reads from the head of the two-ended queue. Note that ProducerBatch is not ProducerRecord. ProducerBatch can contain one or more ProducerRecords. ProducerRecord is a message created in a producer, and ProducerBatch is a message batch. ProducerRecord is included in ProducerBatch, which makes byte usage more compact. At the same time, piecing together smaller ProducerRecords into a larger ProducerBatch can also reduce the number of network requests to improve overall throughput. If the producer client needs to send messages to many partitions, the buffer.memory parameter can be scaled up appropriately to increase overall throughput.

Messages are transmitted over the network in the form of bytes, and a memory area needs to be created to hold the corresponding message before sending it. In the Kafka producer client, message memory is created and released using java.io.ByteBuffer. However, frequent creation and release of RecordAccumulator is resource consuming. There is also a BufferPool inside RecordAccumulator, which is mainly used to reuse ByteBuffer to achieve efficient utilization of cache. BufferPool is only managed for bytebuffers of a specific size, and other sizes are not cached in the BufferPool. This specific size is specified by the batch.size parameter. The default value is 16384B (16KB). We can appropriately increase the batch.size parameter to cache more messages.

The ProducerBatch size is also closely related to the batch.size parameter. When a message (ProducerRecord) flows into RecordAccumulator, ProducerBatch is obtained from the end of the ProducerRecord queue (if not, ProducerBatch is created). Check ProducerBatch to see if the ProducerRecord can still be written to, if so, and create a new ProducerBatch if not. When creating ProducerBatch, evaluate whether the message size exceeds the size of the batch.size parameter. If not, create ProducerBatch with the size of the batch.size parameter. You can reuse it through BufferPool management; If it does, ProducerBatch is created with the estimated size and will not be reused.

Sender is an accumulator with < partition, Deque < ProducerBatch > > and < Node, List < ProducerBatch >. Node represents a Broker Node in a Kafka cluster. For network connections, producer clients are connections to specific broker nodes, sending messages to them regardless of which partition the message belongs to. For KafkaProducer’s application logic, we only care about what messages are sent to which partition, so we need to make a transition from the application logic level to the network I/O level.

After converting to < Node, List < ProducerBatch > >, the Sender further encapsulates the < Node, Request > form, so that it can send Request requests to various nodes. For message sending this is the specific ProduceRequest

Requests are also saved to InFlightRequests before they are sent from the Sender thread to Kafka. InFlightRequests save objects in the form of Map < NodeId, Deque < Request > >, Its main function is to cache requests that have been sent but have not received a response (NodeId is a String representing the NodeId number). InFlightRequests also provides many methods for managing classes and configuration parameters that limit the maximum number of requests cached per connection (that is, between client and Node). This configuration parameter for Max. In. Flight. Requests.. Per connection, the default value is 5, namely each connection cache can be at most 5 did not respond to the request, after more than the value will not be able to send more request to the connection again, Unless a cached request receives a Response. By comparing the size of Deque < Request > with the size of this parameter, we can determine whether the corresponding Node has accumulated many unresponded messages. If this is the case, it indicates that the Node is heavily loaded or has a network connection problem, and further sending requests to it will increase the possibility of Request timeout.

leastLoadedNode

InFlightRequests also obtain LeastLoadedNodes, which are the least laden of all nodes. The minimum load is determined by the unconfirmed requests that each Node has in InFlightRequests, and the more unconfirmed requests the larger the load.

The figure shows three nodes, Node0, Node1, and Node2. It is clear that Node1 has the least load. That is, Node1 is the current leastLoadedNode. Select leastLoadedNode to send requests as soon as possible, avoiding network congestion and other exceptions that may affect the overall progress. The concept of leastLoadedNode can be used in multiple applications, such as metadata requests and consumer multicast protocol interactions.

If the client does not have metadata to be used, for example, the subject information is not specified, or the metadata is not updated within the metadata.max.age.ms time, metadata updates are triggered. The default value of the client parameter metadata.max-age. ms is 300000, that is, 5 minutes. Metadata updates are performed internally and are not visible to external consumers of the client. When metadata needs to be updated, leastLoadedNode is selected and MetadataRequest requests are sent to the Node to obtain specific metadata information. The update operation is initiated by the Sender thread and InFlightRequests are also added after the MetadataRequest is created, following similar steps to sending messages. The metadata is updated by the Sender thread, but it is also read by the main thread, where synchronization is guaranteed by the synchronized and final keywords.