🔥 Kafka O&M Management Platform LogiKM 🔥✏️ more powerful management and control capabilities ✏️ more efficient problem locating capabilities 🎾 🌅 More convenient cluster OPERATION and maintenance (O&M) capabilities ✏ More professional resource management ✏ More friendly OPERATION and maintenance (O&M)

Before reading this article, I hope you can think about the following questions, with questions to read the article will get better results.

  1. When sending a message, can the body of the message be written to the message cache when the Broker fails?
  2. Are messages lost if the Producer client dies while they are still in the cache?
  3. What happens when the latest ProducerBatch has free memory, but the next message is too big to be added to the previous Batch?
  4. How much memory should be allocated when ProducerBatch is created?

What is a RecordAccumulator

In order to improve the throughput and performance of The Producer client, Kafka temporarily caches messages and sends them in batches when certain conditions are met. This reduces network requests and improves throughput.

The RecordAccumulator class that caches this message.

This is the entire message cache model, which we’ll go through one by one.

Message caching model

The figure above shows a model of the message cache, where produced messages are temporarily stored.

  1. For each message, we put them in a different TopicPartition dimensionDeque<ProducerBatch>Inside the queue.

If topicPartitions are the same, they will be in the same Deque .

  1. ProducerBatch: Represents messages in the same batch. Messages sent to the Broker are sent in batches, which may contain one or more messages.
  2. If the ProducerBatch queue for the message is not found, a queue is created.
  3. Find the Batch at the end of the ProducerBatch queue, and find that Batch can also insert this message. Then, the message is directly inserted into this Batch
  4. Find the Batch at the end of the ProducerBatch queue and find that the remaining memory in the Batch is not enough for this message. Then, a new Batch is created
  5. When the message is sent successfully, Batch is released.

ProducerBatch Memory size

How much memory should be allocated when ProducerBatch is created?

Conclusion: If the estimated message memory is larger than batch.size, the message is created based on the estimated message memory; otherwise, the message is created based on the size of batch.size (16K by default).

Let’s look at a piece of code that estimates the size of memory when ProducerBatch is created

RecordAccumulator#append

    /** * official account: Shi Zhenzhen's grocery store * wechat: szzdzhp001 **/
       // Find batch.size and the maximum total memory size for this message in Batch
       int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
       // Request memory
       buffer = free.allocate(size, maxTimeToBlock);


Copy the code


  1. If a message M is currently produced and the message M cannot find the ProducerBatch to store the message (it does not exist or is full), a new ProducerBatch needs to be created
  2. Estimate the size of the messagebatch.sizeThe default size is 16384(16KB). Compare, take the value of the maximum memory size used to request.

Kafka Producer message caching model

So, how was the news estimated? Is it purely the size of the message body?

DefaultRecordBatch#estimateBatchSizeUpperBound

The estimated Batch size required is an estimated estimate, since the compression algorithm is not taken into account from the additional overhead

    /** * gets the upper limit of the batch size with only one record using the given key and value. * This is only an estimate, as it does not take into account the additional overhead of the compression algorithm used. * * /
    static int estimateBatchSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[] headers) {
        return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers);
    }

Copy the code


  1. Estimate the size of this message M + a RECORD_BATCH_OVERHEAD size
  2. RECORD_BATCH_OVERHEAD is a Batch of basic meta information that occupies 61B in total
  3. The size of the M message is not only the size of the message body. The total size is equal to the size of (key,value,headers) +MAX_RECORD_OVERHEAD
  4. MAX_RECORD_OVERHEAD: maximum space occupied by a message header, maximum 21B

That is, to create a ProducerBatch, at least 83B is required.

For example, IF I send a message “1”, the estimated size is 86B, which is the maximum compared to batch.size(the default is 16384). So when applying for memory, set the maximum value to 16384.

Regarding Batch structure and message structure, we will return to a separate article.

Memory allocation

RecordAccumulator is an accumulator with a buffer size of 33554432 (32 MB).

If the production speed is greater than the sending speed, the Producer may block writing.

ProducerBatch is created and released frequently, resulting in frequent GCS. There is a concept of a cache pool in all Kafkas. This cache pool is reused, but only a fixed (batch.size) size can be used.

PS: The following 16K refers to the default value of batch.size.

Batch creation and release

1. Memory The 16 KB cache pool has available memory

①. When creating Batch, the Batch will be used to obtain a piece of memory ByteBuffer from the cache pool.

(2) When Batch is released after the message is sent, the ByteBuffer is added to the end of the buffer pool, and bytebuffer. clear is called to clear the data. So you can reuse it next time

2. Memory No memory is available in the 16K cache pool

When Batch is created, a portion of the memory from the cache pool is used to create Batch. Note: Batch was created by adding 16K memory to the pool nonPooledAvailableMemory, and Batch was created by adding 16K memory to the pool.

(2) When Batch is released after the message is sent, the ByteBuffer will be added to the end of the buffer pool, and bytebuffer. clear will be called to clear the data, so that it can be reused next time

Kafka Producer message caching model

3. Memory Not 16 KB The memory in the cache pool is sufficient

① when Batch is created, a portion of memory is obtained from the nonPooledAvailableMemory to create Batch. Note: Batch is used to allocate memory to the Batch, and then Batch is used to allocate memory to the Batch. Do not mistake it as if memory transfer has actually occurred.

(2) Batch is released after the message is sent, purely adding the Batch memory size to the non-cached pool. Of course, this Batch will be dropped by GC

4. Memory Not 16 KB Memory in the cache pool is insufficient

First, try to release the memory in the cache pool to the non-cache pool one by one until the memory in the non-cache pool is sufficient and create Batch

(2) when creating Batch, use nonPooledAvailableMemory to obtain a portion of the memory used to create Batch. Note: Batch is used to allocate memory to the Batch, and then Batch is used to allocate memory to the Batch. Do not mistake it as if memory transfer has actually occurred.

(3) Batch is released after the message is sent, purely adding the Batch memory size to the non-cache pool (noncache) memory. Of course, this Batch will be dropped by GC

For example, we need to create a batch of 48K, because the number exceeds 16K, we need to allocate memory in the cache pool, but the available memory in the cache pool is 0, so it cannot be allocated. In this case, we will try to release some memory in the cache pool to the cache pool.

If the first ByteBuffer(16K) is not enough, the second buffer will be released until the total number of ByteBuffer is 48K. When the memory is found to be enough, the Batch will be created.

Note: when we refer to memory allocation in the non-cached pool, we only refer to the increase and decrease of the memory number.

Questions and Answers

  1. When sending a message, can the body of the message be written to the message cache when the Broker fails?

When the Broker fails,Producer prompts the following warning ⚠️ while sending a message

The body of the message can still be written to the message cache, only to the cache.

WARN [Producer clientId=console- Producer] Connection to node 0 (/172.23.164.192:9090) could not be established. Broker may not be availableCopy the code

  1. What happens when the latest ProducerBatch has free memory, but the next message is too big to be added to the previous Batch?

A new ProducerBatch will be created.

  1. How much memory should be allocated when ProducerBatch is created?

If the estimated size of the message that triggers the creation of ProducerBatch is greater than batch.size, the estimated memory is created. Otherwise, use batch.size to create the batch.

One more question for you to consider:

Are messages lost if the Producer client dies while they are still in the cache?