Welcome to github.com/hsfxuebao/j… , hope to help you, if you feel ok, please click on the Star

Kafka source analysis 14- How to handle the response message after the Producer successfully sends the message?

Look directly at the completeBatch() method on the Sender thread:

private void completeBatch(RecordBatch batch, ProduceResponse.PartitionResponse response, long correlationId, Long now) {// If the processing is successful, it is successful, but if the server fails, it will send us an exception message. Error = response.error; error = response.error; // If the response contains an exception and the request can be retried if (error! = Errors.NONE && canRetry(batch, error)) { // retry log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}", correlationId, batch.topicPartition, this.retries - batch.attempts - 1, error); // re-queue the batch that failed to send. this.accumulator.reenqueue(batch, now); this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount); } else {// Data from here: with exception, but can not retry (1: not allowed to retry at all 2: the number of retries exceeded) // the rest go to this branch. RuntimeException exception; If (error == errors. TOPIC_AUTHORIZATION_FAILED) // Encapsulate an exception message (custom exception) exception = new TopicAuthorizationException(batch.topicPartition.topic()); else exception = error.exception(); //TODO call the result of their request //TODO call the result of their request //TODO call the result of their request // A complete message sending process is completed. batch.done(response.baseOffset, response.logAppendTime, exception); // It looks like this code is going to recycle resources. this.accumulator.deallocate(batch); if (error ! = Errors.NONE) this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount); } if (error.exception() instanceof InvalidMetadataException) { if (error.exception() instanceof UnknownTopicOrPartitionException) log.warn("Received unknown topic or partition error in produce request on partition {}. The " + "topic/partition may not exist or the user may not have Describe access to it", batch.topicPartition); metadata.requestUpdate(); } // Unmute the completed partition. if (guaranteeMessageOrder) this.accumulator.unmutePartition(batch.topicPartition); }Copy the code

this.accumulator.deallocate(batch); This is called freeing memory.

Reference Documents:

Kafka kafka kafka

Kafka technology insider – Graphic details kafka source code design and implementation

Kafka source code analysis series