Kafka has long been known for its high throughput. Just last week, a performance monitoring project used Kafka Producer to send massive messages was blocked when Kafka Producer asynchronously sent messages.

Yes, you heard that right. Kafka Producer blocks asynchronously sending messages.

In Kafka Producer, there is a buffer pool where all messages sent by the client are stored, and a Sender thread is started to retrieve messages from the buffer pool and send them to the Broker, as shown in the following figure:

Kafka Producer does not send asynchronously. Instead, it returns a Futrue object and waits for the result to be sent synchronously. Block to get the sent result using the Futrue#get method. Why is sending blocked when I call send directly in my project?

When Kafka Producer is built, there is a buffer. Memory parameter that defines the size of Kafka Producer. The default buffer pool size is 32MB.

Kafka Producer Accumulator Is the core Kafka Producer accumulator class. RecordAccumulator is an accumulator class.

The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless this behavior is explicitly disabled.

What does this mean?

When the buffer pool runs out of memory blocks, message appending calls are blocked until there are free memory blocks.

Because performance monitoring projects need to send millions of messages per minute, client delivery can be blocked if the Sender thread can’t get messages from the buffer pool faster than the client can send when the Kafka cluster is overloaded or the network is slightly fluctuating.

Let me write an example to give you a sense of what blocking is like:

public static void main(String[] args) {
  Properties properties = new Properties();
  properties.put(ProducerConfig.ACKS_CONFIG, "0");
  properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
  properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094");
  properties.put(ProducerConfig.LINGER_MS_CONFIG, 1000);
  properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024 * 1024);
  properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 5242880);
  properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
  KafkaProducer<String, byte[]> producer = new KafkaProducer<>(properties);
  String str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
  List<byte[]> bytesList = new ArrayList<>();
  Random random = new Random();
  for (int j = 0; j < 1024; j++) {
    int i1 = random.nextInt(10);
    if (i1 == 0) {
      i1 = 1;
    }
    byte[] bytes = new byte[1024 * i1];
    for (int i = 0; i < bytes.length - 1; i++) {
      bytes[i] = (byte) str.charAt(random.nextInt(62));
    }
    bytesList.add(bytes);
  }

  while (true) {
    long start = System.currentTimeMillis();
    producer.send(new ProducerRecord<>("test_topic", bytesList.get(random.nextInt(1023))));
    long end = System.currentTimeMillis() - start;
    if (end > 100) {
      System.out.println("Sending time :" + end);
    }
    // Thread.sleep(10);}}Copy the code

Sleep (10); sleep(10); If you comment it out, it will take a long time to send:

Using JProfiler, you can see that there is a block where memory is allocated:

Trace to source code:

Found in the org. Apache. Kafka. Clients. Producer. The internals. BufferPool# the allocate method, if there is no free memory, judgment, the buffer pool will block the memory allocation, until there are free memory.

Thread.sleep(10); Thread.sleep(10); thread.sleep (10); In this case, the Sender thread sends faster than the client, and the buffer pool is always full, so there is no blocking.

Kafka Produer blocks when the buffer pool is full. No, in fact, there is another place, will be blocked!

Before Kafka Producer sends a message for the first time, it needs to obtain Metadata of the topic. Metadata includes information about the node where the Leader of the topics-related partition resides, information about the node where the replica resides, and ISR list. After Kafka Producer obtains Metadata, it sends messages to the Leader of the specified partition based on the Metadata content. The process of obtaining Metadata is as follows:

As shown above, Kafka Producer checks whether the topic’s Metadata needs to be updated before sending a message. If so, it wakes up the Sender thread and sends the Metatadata update request. The Kafka Producer main thread blocks waiting for the Metadata to update.

If Metadata is never updated, the client will always block there.

Author’s brief introduction

The author Zhang Chenghui, good at messaging middleware skills, responsible for the company’s millions of TPS level Kafka cluster maintenance, maintenance of the public number “back-end advanced” irregularly share Kafka, RocketMQ series does not speak of the concept of direct combat summary and details of the source code analysis; At the same time, the author is also a Seata Contributor, an Ali open source distributed transaction framework, so he will share his knowledge about Seata. Of course, the public account will also share WEB related knowledge such as Spring bucket. The content may not be exhaustive, but it must make you feel that the author’s pursuit of technology is serious!

Public number: back-end advanced

Tech blog: objcoding.com/

GitHub:github.com/objcoding/