Kafka Producer blocks sending messages asynchronously? After reading the source code again, I found that the buffer pool design is very good, and its design idea is elegant, so I can not help but share a wave with you.

In the new version of Kafka Producer, a message buffer pool is designed. When Producer is created, a default buffer pool of 32 megabytes is created. The buffer pool size can also be specified using the buffer.memory parameter. The size of the memory block is the batch.size that is transmitted when Producer is created. The default size is 16384, and each batch contains a batch.size memory block in which messages are stored. The structure of the buffer pool is shown below:

The client adds messages to a Batch in the corresponding topic partition. If the Batch is full, a new Batch is created and a Batch. size memory block is applied to the RecordAccumulator for storing messages.

When a Batch message is sent to the Broker, Kafka Producer removes the Batch. Since the Batch holds a chunk of memory, there are bound to be GC issues as follows:

The above, frequent memory requisition, then discarded, inevitably leads to frequent GC, resulting in serious performance problems. So how does Kafka avoid frequent GC’s?

As mentioned earlier, the buffer pool is logically divided into blocks of equal size. When a message is sent, it is returned to the buffer pool to avoid being recycled.

The memory holding class of the BufferPool is BufferPool. Let’s look at the members of the BufferPool.

public class BufferPool {
  // Total memory size
  private final long totalMemory;
  // The size of each memory block, i.e. Batch.size
  private final int poolableSize;
  // Lock the synchronization lock of the memory requisition and return methods
  private final ReentrantLock lock;
  // Free memory block
  private final Deque<ByteBuffer> free;
  // Need to wait for the event of free memory block
  private final Deque<Condition> waiters;
  /** Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize.  */
  // The unallocated free memory in the buffer pool, from which the newly allocated memory block is obtained
  private long nonPooledAvailableMemory;
	// ...
}
Copy the code

The BufferPool is actually made up of bytebuffers. The BufferPool holds these memory blocks and stores them in member free. The total size of free is limited by totalMemory. NonPooledAvailableMemory indicates how much memory is left in the buffer pool that has not been allocated.

When Batch messages are sent, the memory blocks held by Batch are returned to FREE, so that the next Batch does not create new Bytebuffers when applying for memory blocks. In this way, the memory blocks can be fetched from free, avoiding the problem of memory blocks being reclaimed by JVM.

Next, we will analyze how to apply memory and return memory.

1. Apply for memory

Access to memory:

org.apache.kafka.clients.producer.internals.BufferPool#allocate

1) Sufficient memory

When the user requests memory, if the user finds free memory in free, the user directly obtains the memory from free:

if (size == poolableSize && !this.free.isEmpty()){
  return this.free.pollFirst(); 
}
Copy the code

Where size is the requested memory size, It is Math. Max (enclosing batchSize, AbstractRecords. EstimateSizeInBytesUpperBound (maxUsableMagic, compression will key, value, headers));

If your message size is smaller than batchSize, the memory size is batchSize. If your message size is equal to batchSize and free is not free, the memory size is batchSize.

In order to get free memory blocks from free, Kafka must request memory size equal to batchSize.

The buffer pool size is fixed, which is equal to the batchSize. If the buffer pool size is larger than the batchSize, the buffer pool size is larger than the batchSize. Let’s move on:

// now check if the request is immediately satisfiable with the
// memory on hand or if we need to block
int freeListSize = freeSize() * this.poolableSize;
if (this.nonPooledAvailableMemory + freeListSize >= size) {
  // we have enough unallocated or pooled memory to immediately
  // satisfy the request, but need to allocate the buffer
  freeUp(size);
  this.nonPooledAvailableMemory -= size;
}
Copy the code

FreeListSize: Refers to the total size of free memory blocks that have been allocated and reclaimed in free;

NonPooledAvailableMemory: the unallocated free memory in the buffer pool from which the newly allocated memory block is obtained.

Enclosing nonPooledAvailableMemory + freeListSize: total free memory space in the buffer pool.

If the buffer pool memory space is larger than the requested memory size, freeUp(size) is called; Method, then subtract the requested memory size from the free memory size.

private void freeUp(int size) {
  while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size)
    this.nonPooledAvailableMemory += this.free.pollLast().capacity();
}
Copy the code

The freeUp method is an interesting one. The idea is this:

If the unallocated memory size is smaller than the requested memory, the allocated memory can only be retrieved from the list free until nonPooledAvailableMemory is larger than the requested memory.

2) Insufficient memory

In my post “Kafka Producer blocks asynchronously sending messages? As mentioned in this article, when the buffer pool runs out of memory blocks, message appending calls are blocked until there are free memory blocks.

How is the logic for blocking wait implemented?

// we are out of memory and will have to block
int accumulated = 0;
Condition moreMemory = this.lock.newCondition();
try {
  long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
  this.waiters.addLast(moreMemory);
  // loop over and over until we have a buffer or have reserved
  // enough memory to allocate one
  while (accumulated < size) {
    long startWaitNs = time.nanoseconds();
    long timeNs;
    boolean waitingTimeElapsed;
    try{ waitingTimeElapsed = ! moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS); }finally {
      long endWaitNs = time.nanoseconds();
      timeNs = Math.max(0L, endWaitNs - startWaitNs);
      recordWaitTime(timeNs);
    }

    if (waitingTimeElapsed) {
      throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
    }

    remainingTimeToBlockNs -= timeNs;

    // check if we can satisfy this request from the free list,
    // otherwise allocate memory
    if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
      // just grab a buffer from the free list
      buffer = this.free.pollFirst();
      accumulated = size;
    } else {
      // we'll need to allocate memory, but we may only get
      // part of what we need on this iteration
      freeUp(size - accumulated);
      int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
      this.nonPooledAvailableMemory -= got; accumulated += got; }}Copy the code

General logic of the above source code:

Create a waiters Condition and add it to a waiters Condition of type Deque (waiters will wake it up in return memory). The while loop keeps collecting free memory until it exits when it is larger than the requested memory. In the while loop, Condition#await () : Condition#await (); Condition#await () : Condition#await (); Condition#await (); Call the freeUp method to free free memory from free, and then add it until it is larger than the requested memory.

2. Return the memory

Access to memory:

org.apache.kafka.clients.producer.internals.BufferPool#deallocate(java.nio.ByteBuffer, int)

public void deallocate(ByteBuffer buffer, int size) {
  lock.lock();
  try {
    if (size == this.poolableSize && size == buffer.capacity()) {
      buffer.clear();
      this.free.add(buffer);
    } else {
      this.nonPooledAvailableMemory += size;
    }
    Condition moreMem = this.waiters.peekFirst();
    if(moreMem ! =null)
      moreMem.signal();
  } finally{ lock.unlock(); }}Copy the code

The logic for returning memory blocks is simple:

If the returned memory block is equal to batchSize, it is emptied and added to the free of the buffer pool, returning it to the buffer pool, preventing the JVM GC from reclaiming the memory block. What if it doesn’t? You can simply add the memory size to the value of the unallocated and free memory without returning it, wait for the JVM GC to reclaim it, and finally wake up the thread that is waiting for free memory.

After the above source code analysis, we point out a problem that needs to be paid attention to. If set improperly, it will bring serious performance impact on the Producer end:

If your message size is larger than batchSize, instead of recycling the allocated memory block from free, a new ByteBuffer is created and not returned to the buffer pool (JVM GC collection). If nonPooledAvailableMemory is smaller than the message body at this time, then the free block in free is destroyed (JVM GC reclaim) so that there is enough memory in the buffer pool for the user to apply, which can lead to frequent GC problems.

Therefore, batch.size needs to be adjusted appropriately according to the size of the business message to avoid frequent GC.

Author’s brief introduction

The author Zhang Chenghui, good at messaging middleware skills, responsible for the company’s millions of TPS level Kafka cluster maintenance, maintenance of the public number “back-end advanced” irregularly share Kafka, RocketMQ series does not speak of the concept of direct combat summary and details of the source code analysis; At the same time, the author is also a Seata Contributor, an Ali open source distributed transaction framework, so he will share his knowledge about Seata. Of course, the public account will also share WEB related knowledge such as Spring bucket. The content may not be exhaustive, but it must make you feel that the author’s pursuit of technology is serious!

Public number: back-end advanced

Tech blog: objcoding.com/

GitHub:github.com/objcoding/