preface

From our previous article, “A Detailed introduction to Production Interceptors, Serializers, and Partiers in Kafka”, we know that the producer client sends a message through a series of modules, including interceptors, serializers, and partiers, before the message is written to the cache. So today we will look at the design of kafka’s producer client cache architecture. Let’s take a look at the complete process and architecture of sending messages to the server:

Message accumulator

In fact, the Producer client is coordinated by two threads: the Producer thread and the Sender thread. The message is produced by the main thread and then cached in the RecordAccumulator. The Sender thread is responsible for fetching messages from the message accumulator and sending them to kakFA Broker. The process is roughly as follows:

So why do you need a message accumulator? Isn’t it more direct to send it directly? In fact, it is mainly used to cache messages so that the Sender thread can send them in batches each time, thus reducing the resource consumption of network transmission; In addition, data can be written to the server in batches, reducing disk I/O resource consumption and improving disk performance. This design can also be applied to our usual business requirements scenario development.

So what is the structure in the message accumulator? The message accumulator internally maintains a double-ended queue Deque for each distinction. The content of the queue is ProducerBatch. ProducerBatch contains one or more ProducerRecord messages.

ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
Copy the code

The specific structure is as follows:

When a message Record is written to the message accumulator, it is appended to the end of the double-ended queue. The remaining available space at the end of ProducerBatch also means that the ProducerRecord can be written again. The size of ProducerBatch is closely related to the batch.size parameter. The batch.size parameter size controls the size of ProducerBatch, which is 16KB by default.

The process goes like this:

  1. When a message is written to the message accumulator, look for the dual-ended queue of its corresponding partition (create a queue if none exists)
  2. Get a ProducerBatch object from the end of the double-ended queue (create a Batch object if it does not exist)
  3. Determine whether ProducerBatch has room to write the message Record, if so, write (if not, create a new ProducerBatch object)
  4. When a ProducerBatch object is created, it evaluates whether the size of the message exceeds the size of the batch.size parameter. If no, the batch object is created with the size of the batch.size parameter (if it does, a new Batch object is created with the estimated size of the message). Batch. size is managed by BufferPool and can be reused. The memory space that exceeds batch.size will not be overused and will be reclaimed.

The buffer space of the entire message accumulator is related to the buffer.memory parameter. The default size is 32MB. If the producer client needs to send a large number of messages to a large number of distinctions, this parameter can be set as high as necessary to increase the overall throughput.

The Sender thread

The Sender thread asynchronously retrieves cached messages from the message accumulator, converts them into ProducerRequest objects in the specified format, and sends requests from the Request object to each broker. However, requests are stored in InFlightRequests before they are sent to the broker from the Sender thread. The main function of InFlightRequests is to cache requests that have been sent but have not yet received a response from the server.

InFlightRequests related have a kind of important configuration parameter is Max. In the flight. Requests.. Per connection. This parameter represents the maximum number of cache requests per connection (the network connection between the client and the broker Node node). The default value is 5. When that number is exceeded, the consumer client cannot send any more requests to the connection. Note also that when this parameter is greater than 1, there is a risk that messages will be out of order due to failed retries.

conclusion

This time we understand the two threads of the Producer client, the main thread Producer thread and the Sender thread, their respective responsibilities and functions; At the same time, I also understand the sending caching mechanism of Producer client in Kafka. Good design mechanics can also be applied to other similar business developments.

Code word is not easy, work together!! Your likes are what keeps me going