Kafka – client version 2.2.2

Here is a demo to explain the cause of this problem and troubleshooting ideas

import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; public class MessageQueueProducer { KafkaProducer kafkaProducer; public MessageQueueProducer(KafkaProducer kafkaProducer) { this.kafkaProducer = kafkaProducer; } public synchronized void send(final String topic, final String message) { ProducerRecord<String, String> kafkaMessage = new ProducerRecord<String, String>(topic, message); kafkaProducer.send(kafkaMessage, (recordMetadata, e) -> { if (e == null) { //success } else { //send "retry-topic" to retry send("retry-topic", message); }}); } public synchronized void close() { kafkaProducer.flush(); kafkaProducer.close(); }}Copy the code

Cause analysis,

  1. Let’s say we have a simple encapsulation of KafkaProducer (as above)
  2. Suppose you have two threads, threadA and threadB
  3. ThreadA call MessageQueueProducer. Close method, the close method of flush was meant to think, before the Producer is close to one-time send buffer data in Broker to ensure data integrity.
  4. All methods are synchronized, so threadA gets the MessageQueueProducer object lock
  5. The flush method waits for all data to be sent and to receive acknowledgement from the Broker

Problems at the moment, the hypothesis threadB calls the MessageQueueProducer. Send met abnormal, need in Kafka’s callback function sends a message to the retry queue, asynchronous retry

Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or they will delay the sending of messages from other threads. If you want to execute blocking or computationally expensive callbacks it is recommended to use your own java.util.concurrent.Executor in the callback body to parallelize processing.

In Kafka’s documentation, the callback function is called by the ioThread of the Producer, so at this point, the ioThread begins to request the lock of the MessageQueueProducer object, which is already held by threadA. And threadA is waiting for the ioThread to perform all the callbacks to ensure that the message is sent. Therefore, the MessageQueueProducer object is locked, and neither send nor CLOSE can be sent

Source parsing of the Flush method

RecordAccumulator.java :690 /** * Mark all partitions as ready to send and block until the send is complete */ public void awaitFlushCompletion() throws InterruptedException { try { for (ProducerBatch batch : This. Incomplete. CopyAll ()) / / all unfinished message, each waiting for complete confirmation batch. ProduceFuture. Await (); } finally { this.flushesInProgress.decrementAndGet(); } } produceFuture :ProduceRequestResult /** * Mark this request as complete and unblock any threads waiting on its completion. */ public void done() { if (baseOffset == null) throw new IllegalStateException("The method `set` must be invoked before this method."); this.latch.countDown(); } /** * Await the completion of this request */ public void await() throws InterruptedException { latch.await(); } CountDownLatch only performs countDown in the done method, Have a look at is the done method is called ioThread Sender.com pleteBatch - > ProducerBatch. Done pleteFutureAndFireCallbacks - > ProducerBatch.com --> produceFuture.doneCopy the code

Troubleshooting ideas

The phenomenon of

Obviously, found that call MessageQueueProducer. The send method of thread is HANG, can’t continue to perform. In the example above, threadA and threadB are both BLOCKED.

The alarm

Alarms are too important for the robustness of the program. If there are no alarms, it may be difficult to discover the above situation. Alarm indicators can be configured according to service needs.

  1. Start the Arthas Attch current application on your machine
  2. Run the thread-all command to view the status of all threads
  3. You can see that both threadA and threadB are blocked

  1. Executing thread – b
  2. Arthas finds the culprit that blocks the other threads, threadA. Print the stack of threadA
  3. Jstack -l PID gets the stack of all threads
  4. Compare the number of threads blocked by threadA to the Arthas output.