In the previous section, we analyzed the Producer’s core components, and we got a key component diagram. Do you remember?

To summarize the above picture:

A Metadata component is created to maintain Metadata through a Cluster

Initialize RecordAccumulator, the memory buffer for sending messages

We created NetworkClient, and internally most importantly, we created a Selector component for NIO

A Sender thread is started, and the Sender references all of the above components and executes the run method.

As you can see at the bottom of the diagram, the last section ended with the execution of the Run method. In this section we will first look at what the core context of the run method does. Then analyze the source code principle of the first core process of Producer: metadata pulling.

Let’s get started!

What’s the Sender’s run method doing?

What I want to do in this video is look at what the sender’s run method does when it starts executing.

 public void run(a) {
        log.debug("Starting Kafka producer I/O thread.");

        // main loop, runs until close is called
        while (running) {
            try {
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);

        log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

        // okay we stopped accepting requests but there may still be
        // requests in the accumulator or waiting for acknowledgment,
        // wait until these are completed.
        while(! forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
            try {
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e); }}if (forceClose) {
            // We need to fail all the incomplete batches and wake up the threads waiting on
            // the futures.
        try {
        } catch (Exception e) {
            log.error("Failed to close network client", e);

        log.debug("Shutdown of Kafka producer I/O thread has completed.");
Copy the code

The core of the run method is simple. It’s basically 2 while loops + the thread’s close, and 2 while loops, they both call the run(long time) method.

As you can see from the comment, the second while handles special cases. When the first while exits, there are unsent requests and the second while loop does not close the thread until it completes processing.

The overall context is shown in the figure below:

Now, what’s the main thing about the run method?

     * Run a single iteration of sending
     * @param now
     *            The current POSIX time in milliseconds
    void run(long now) {
        Cluster cluster = metadata.fetch();
        // get the list of partitions with data ready to send
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        // if there are any partitions whose leaders are not known yet, force metadata update
        if (result.unknownLeadersExist)

        // remove any nodes we aren't ready to send to
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node =;
            if (!this.client.ready(node, now)) {
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); }}// create produce requests
        Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
        if (guaranteeMessageOrder) {
            // Mute all the partitions drained
            for (List<RecordBatch> batchList : batches.values()) {
                for (RecordBatch batch : batchList)

        List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
        // update sensors
        for (RecordBatch expiredBatch : expiredBatches)
            this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);

        List<ClientRequest> requests = createProduceRequests(batches, now);
        // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
        // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
        // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
        // with sendable data that aren't ready to send since they would cause busy looping.
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        if (result.readyNodes.size() > 0) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            log.trace("Created {} produce requests: {}", requests.size(), requests);
            pollTimeout = 0;
        for (ClientRequest request : requests)
            client.send(request, now);

        // if some partitions are already ready to be sent, the select time would be 0;
        // otherwise if some partition already has some data accumulated but not ready yet,
        // the select time will be the time difference between now and its linger expiry time;
        // otherwise the select time will be the time difference between now and the metadata expiry time;
        this.client.poll(pollTimeout, now);
Copy the code

The code above, if you look at it for the first time, you will feel that the context is very unclear and you don’t know what the point is. Fortunately, there are some notes, so you can get a general idea of what he’s doing.

Accumulator is ready, NetworkClient is ready, NetworkClient is Send, and NetworkClient is poll

These seem to mean preparing memory areas, preparing nodes for network connections, sending data, and pulling response results.

But what if you can’t guess?

At this point, debug can be used to kill. Since it’s producer, we can look at the Hellowolrd client break point step by step.

When you break the run method step by step, you will find:

The logic of “Ready” of Accumulator, “Ready” of NetworkClient, and “Send” of NetworkClient were almost not executed. All of them initialized empty objects or directly returned inside the method.

It goes all the way to the client.poll method. As shown below:

So you can conclude that the core logic of the while loop through the run method for the first time is really just one sentence:

client.poll(pollTimeout, now)

The overall context is as follows:

The NetworkClient poll method is the key of the key:

    /** * Do actual reads and writes to sockets. **@param timeout The maximum amount of time to wait (in ms) for responses if there are none immediately,
     *                must be non-negative. The actual timeout will be the minimum of timeout, request timeout and
     *                metadata timeout
     * @param now The current time in milliseconds
     * @return The list of responses received
    public List<ClientResponse> poll(long timeout, long now) {
        long metadataTimeout = metadataUpdater.maybeUpdate(now);
        try {
            this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);

        // process completed actions
        long updatedNow = this.time.milliseconds();
        List<ClientResponse> responses = new ArrayList<>();
        handleCompletedSends(responses, updatedNow);
        handleCompletedReceives(responses, updatedNow);
        handleDisconnections(responses, updatedNow);
        handleTimedOutRequests(responses, updatedNow);

        // invoke callbacks
        for (ClientResponse response : responses) {
            if (response.request().hasCallback()) {
                try {
                } catch (Exception e) {
                    log.error("Uncaught error in request completion:", e); }}}return responses;
Copy the code

The context of this method is much clearer. Through the method name and comments, we can almost guess some of its functions are mainly:

The socket is actually read and written to

. 2) metadataUpdater maybeUpdate (), do you still remember NetworkClient DefaultMetadataUpdater components, the method name means possible metadata updates. This seems to be crucial

3) Then we execute the Selector poll method, which is another component of NetworkClient’s Selector, remember? It encapsulates the native NIO Selector underneath. This method should also be key.

4) A series of methods were subsequently implemented on Response. Judging from the name, handleCompletedSends HandleDisconnections handles disconnected requests, handleConnections handles successful connections, and handleTimedOu handles timeout requests TRequests. There are different treatments according to different situations.

5) Finally, there is a callback processing related to response. If the callback function is registered, it will be executed. This is probably not the key logic

NetworkClient performs poll polling, reads and writes requests using selector, and performs different poll operations on the responses.

As shown below:

Because it is the first loop, accumulator ready, NetworkClient ready and NetworkClient send did not do anything before. The first while run core executes the NetworkClient.poll method. The main logic of the poll method is shown in the figure above.

MaybeUpdate might be pulling metadata?

The poll method first executes the maybeUpdate method of DefaultMetadataUpdater, which means possible update. Let’s take a look at his logic.

        public long maybeUpdate(long now) {
            // should we update our metadata?
            long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
            long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
            long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
            // if there is no node available to connect, back off refreshing metadata
            long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),

            if (metadataTimeout == 0) {
                // Beware that the behavior of this method and the computation of timeouts for poll() are
                // highly dependent on the behavior of leastLoadedNode.
                Node node = leastLoadedNode(now);
                maybeUpdate(now, node);

            return metadataTimeout;

   /** * The next time to update the cluster info is the maximum of the time the current info will expire and the time the * current info can be updated (i.e. backoff time has elapsed); If an update has been request then the expiry time * is now */
    public synchronized long timeToNextUpdate(long nowMs) {
        long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
        long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
        return Math.max(timeToExpire, timeToAllowUpdate);
Copy the code

There is a time determination and maybeUpdate will be executed when the determination is satisfied.

If the metadataTimeout is 0, the true maybeUpdate() will be executed.

In cases like this, we can create a breakpoint on metadataTimeout and see how it evaluates, as shown in the following figure:

You can see that when you first execute the while loop, execute the poll method, execute the maybeUpdate, determine the 3 values of metadataTimeout, two of which are 0, and one of which is non-0, a value of 299720. So metadataTimeout is also non-zero, 299720.

That is, the first while loop does not execute any of maybeUpdate’s logic.

So then we go down to the poll() method of Selector.

   /** * Do whatever I/O can be done on each connection without blocking. This includes completing connections, Completing * disconnections, initiating new bursts, or making progress on in-progress sends or receives. Do whatever I/O you can on each connection. This includes completing a connection, disconnecting, starting a new send, or sending or receiving requests in progress */
    public void poll(long timeout) throws IOException {
        if (timeout < 0)
            throw new IllegalArgumentException("timeout should be >= 0");


        if(hasStagedReceives() || ! immediatelyConnectedKeys.isEmpty()) timeout =0;

        /* check ready keys */
        long startSelect = time.nanoseconds();
        // This method is NIO's underlying Selector. Select (), which blocks listening
        int readyKeys = select(timeout);
        long endSelect = time.nanoseconds();
        currentTimeNanos = endSelect;
        this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());

        ReadyKeys >0< performs some operations on SelectionKeys
        if (readyKeys > 0| |! immediatelyConnectedKeys.isEmpty()) { pollSelectionKeys(this.nioSelector.selectedKeys(), false);
            pollSelectionKeys(immediatelyConnectedKeys, true);


        long endIo = time.nanoseconds();
        this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());

    private int select(long ms) throws IOException {
        if (ms < 0L)
            throw new IllegalArgumentException("timeout should be >= 0");

        if (ms == 0L)
            return this.nioSelector.selectNow();
Copy the code

There are two main steps in the above context:

1) select(timeout): NIO underlying selector. Select () blocks listening

2) pollSelectionKeys(): listen on the SelectionKeys and do some operations

That is, eventually, the Sender thread’s run method, the first time the while loop executes the poll method, ends up doing nothing, will be blocked by selector. Select ().

As shown below:

After the new KafkaProducer

KafkaProducer() : KafkaProducerHelloWorld: KafkaProducer() : KafkaProducer() : KafkaProducer()

After one and a half sessions, KafkaProducer has finally explained how KafkaProducer works. I don’t know if you understand Kafka Producer better.

What about new KafkaProducer()?

Let’s move on to KafkaProducerHelloWorld. Do you remember the KafkaProducerHelloWorld code?

public class KafkaProducerHelloWorld {

	public static void main(String[] args) throws Exception {
		// Configure some Kafka parameters
		Properties props = new Properties();

		// Create a Producer instance
		KafkaProducer<String, String> producer = new KafkaProducer<>(props);

		Encapsulate a message
		ProducerRecord<String, String> record = new ProducerRecord<>(

		// Sending messages synchronously will block here until the message is sent
		// producer.send(record).get();

		// Send messages asynchronously without blocking. Set a listener callback function
		producer.send(record, new Callback() {
			public void onCompletion(RecordMetadata metadata, Exception exception) {
				if(exception == null) {
					System.out.println("Message sent successfully");
				} else{ exception.printStackTrace(); }}}); Thread.sleep(5 * 1000);

		/ / from the producer
Copy the code

KafkaProducerHelloWorld has three main steps:

1) New KafkaProducer we have analyzed the configuration file parsing, what each component is, what there is, and what the run thread executes the first time through the loop.

2) New ProducerRecord creates a message to be sent

3) Producer. Send (

First create the message to be sent:

ProducerRecord<String, String> record = new ProducerRecord<>("test-topic"."test-key"."test-value");

public ProducerRecord(String topic, K key, V value) {
    this(topic, null.null, key, value);
    /** * Creates a record with a specified timestamp to be sent to a specified topic and partition * Creates a record with the specified timestamp to send to the specified topic and partition *@param topic The topic the record will be appended to
     * @param partition The partition to which the record should be sent
     * @param timestamp The timestamp of the record
     * @param key The key that will be included in the record
     * @param value The record contents
    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
        if (topic == null)
            throw new IllegalArgumentException("Topic cannot be null");
        if(timestamp ! =null && timestamp < 0)
            throw new IllegalArgumentException("Invalid timestamp " + timestamp);
        this.topic = topic;
        this.partition = partition;
        this.key = key;
        this.value = value;
        this.timestamp = timestamp;
Copy the code

As we mentioned earlier, Record represents an abstract encapsulation of a message. This ProducerRecord actually represents a message.

As you can see from the constructor comment **, ProducerRecord can specify which topic, which partition to go to, and messages can be set to a timestamp. Partitions and timestamps may not specify ** by default

In fact, look at this piece of source code, the main information we get is this, these are relatively simple. I’m not going to draw it.

Metadata pull trigger when sending a message

Once both Producer and Record are created, messages can be sent synchronously or asynchronously.

// Sending messages synchronously will block here until the message is sent
// producer.send(record).get();

// Send messages asynchronously without blocking. Set a listener callback function
producer.send(record, new Callback() {
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if(exception == null) {
            System.out.println("Message sent successfully");
        } else{ exception.printStackTrace(); }}});// Synchronize sending
    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        return send(record, null);
    // Send it asynchronously
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);

Copy the code

The entire sending logic for synchronous and asynchronous is shown below:

As you can see from the figure above, however, the same method doSend() is called by the underlying layer both synchronously and asynchronously. The difference is that there are no callBack functions, but they also register some interceptors before calling them. Here we will focus on the doSend method.

The doSend method is as follows:

 * Implementation of asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>.
 * See {@link #send(ProducerRecord, Callback)} for details.
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    TopicPartition tp = null;
    try {
        // first make sure the metadata for the topic is available
        long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
        long remainingWaitMs = Math.max(0.this.maxBlockTimeMs - waitedOnMetadataMs);
        byte[] serializedKey;
        try {
            serializedKey = keySerializer.serialize(record.topic(), record.key());
        } catch (ClassCastException cce) {
            throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                    " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                    " specified in key.serializer");
        byte[] serializedValue;
        try {
            serializedValue = valueSerializer.serialize(record.topic(), record.value());
        } catch (ClassCastException cce) {
            throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                    " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                    " specified in value.serializer");
        int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
        int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
        tp = new TopicPartition(record.topic(), partition);
        long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
        log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
        // producer callback will make sure to call both 'callback' and interceptor callback
        Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
        RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
        if (result.batchIsFull || result.newBatchCreated) {
            log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
        return result.future;
        // handling exceptions and record the errors;
        // for API exceptions return them in the future,
        // for other exceptions throw directly
    } catch (ApiException e) {
        log.debug("Exception occurred during message send:", e);
        if(callback ! =null)
            callback.onCompletion(null, e);
        if (this.interceptors ! =null)
            this.interceptors.onSendError(record, tp, e);
        return new FutureFailure(e);
    } catch (InterruptedException e) {
        if (this.interceptors ! =null)
            this.interceptors.onSendError(record, tp, e);
        throw new InterruptException(e);
    } catch (BufferExhaustedException e) {
        if (this.interceptors ! =null)
            this.interceptors.onSendError(record, tp, e);
        throw e;
    } catch (KafkaException e) {
        if (this.interceptors ! =null)
            this.interceptors.onSendError(record, tp, e);
        throw e;
    } catch (Exception e) {
        // we notify interceptor about all exceptions, since onSend is called before anything else in this method
        if (this.interceptors ! =null)
            this.interceptors.onSendError(record, tp, e);
        throwe; }}Copy the code

Although the context of this method is relatively long, but the context is still relatively clear, mainly implemented first:

1) waitOnMetadata should be waiting for metadata to be pulled

2) keySerializer. Serialize and valueSerializer. Serialize, obviously is to Record the serialization into byte byte array

3) Partition routing. Select a partition under a Topic based on a routing policy

4) Accumulator.append puts the message into the buffer

5) Wake up the Sender thread’s selector. Select () block and start processing the data in the memory buffer.

It can be expressed as follows:

In these two sections we focus on the source code for this metadata pull scenario.

So we’re going to focus on step 1, and we’re going to look at the next four steps.

How does waitOnMetadata wait for metadata to be pulled?

Since the first step in send is to execute the waitOnMetadata method, let’s take a look at its code:

     * Wait for cluster metadata including partitions for the given topic to be available.
     * @param topic The topic we want metadata for
     * @param maxWaitMs The maximum time in ms for waiting on the metadata
     * @return The amount of time we waited in ms
    private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException {
        // add topic to metadata topic list if it is not there already.
        if (!this.metadata.containsTopic(topic))

        if(metadata.fetch().partitionsForTopic(topic) ! =null)
            return 0;

        long begin = time.milliseconds();
        long remainingWaitMs = maxWaitMs;
        while (metadata.fetch().partitionsForTopic(topic) == null) {
            log.trace("Requesting metadata update for topic {}.", topic);
            int version = metadata.requestUpdate();
            metadata.awaitUpdate(version, remainingWaitMs);
            long elapsed = time.milliseconds() - begin;
            if (elapsed >= maxWaitMs)
                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
            if (metadata.fetch().unauthorizedTopics().contains(topic))
                throw new TopicAuthorizationException(topic);
            remainingWaitMs = maxWaitMs - elapsed;
        return time.milliseconds() - begin;

    /** * Get the current cluster info without blocking */
    public synchronized Cluster fetch(a) {
        return this.cluster;

    public synchronized int requestUpdate(a) {
        this.needUpdate = true;
        return this.version;

    /** * Wait for metadata update until the current version is larger than the last version we know of */
    public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
        if (maxWaitMs < 0) {
            throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
        long begin = System.currentTimeMillis();
        long remainingWaitMs = maxWaitMs;
        while (this.version <= lastVersion) {
            if(remainingWaitMs ! =0)
            long elapsed = System.currentTimeMillis() - begin;
            if (elapsed >= maxWaitMs)
                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); remainingWaitMs = maxWaitMs - elapsed; }}Copy the code

The core of this method is to determine whether there is Cluster metadata information. If not, the following operations are performed:

1) metadata. RequestUpdate (); Updated a needUpdate flag that affects the metadataTimeout calculation of maybeUpdate. MetadataTimeout can be 0

2) sender. Wakeup (); Wake up the previous blocking of nioSelector. Select () and continue execution

3) metadata. AwaitUpdate (version, remainingWaitMs); The metadata.wait () method is used for each Object, and is usually used with notify or notifyAll.

The whole process is directly represented by a diagram, as follows:

The whole diagram is the key result of today’s analysis. ** There are two blocking and wakeUp mechanisms, one is the Selector () and wakeUp() in NIO, and the other is the wait() and notifyAll() mechanism of MetaData objects. ** So this is combined with the blocking logic of the Sender thread.

Interesting use, isn’t it? There are no join, sleep, wait, park, unpark, notify methods for any thread.


Finally, we make a brief summary. In this section, we mainly analyze the source code principle of Producer as follows:

When KafkaProducer is initialized, the metadata is not pulled, but the Selector component is created, the Sender thread is started, and the SELECT blocks waiting for the request response. Because no request has been sent, the metadata is not actually pulled during initialization.

The first call to send wakes up select(), which was blocked before the Selector was called, enters the second while loop, and sends the pull metadata request. Obejct. Wait for 60 seconds to pull metadata from the Broker. The actual production message request will continue, otherwise a pull metadata timeout exception will be reported.

In this section we have just seen how to wait for metadata pull with a wait.

And the select that wakes up the Selector should then go into a second while loop

How does the second while loop send a request to pull metadata and notifyAll() to wake up on success?

Let’s continue the analysis in the next section, please stay tuned! See you in the next video!

This article is published by OpenWrite!