According to the introduction of last class, the service thread put message into RecordAccumulator for buffered through kafkaproducer.send () method, and did not carry out actual NETWORK I/O operation. The real network I/O is done by the Sender thread.

First, we go back to the constructor of KafkaProducer. We can see:

// Create the Sender object, which implements the Runnable interface
this.sender = newSender(logContext, kafkaClient, this.metadata);
// Create and start the IO thread that executes the logic in sender.run ()
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
Copy the code

RunOnce () method

Since Sender is a Runnable object, the core logic of the entire Sender thread is in the run() method, and the first piece of code in the run() method is to loop through the runOnce() method:

while (running) { The running field is used to indicate whether the current Sender thread is executing properly
    try {
        runOnce(); 
    } catch (Exception e) {
        log.error("Uncaught error in kafka producer I/O thread: ", e); }}Copy the code

The runOnce() method is the Sender thread’s cycle of execution, in which a batch request is sent and a response is processed. The core implementation is as follows:

void runOnce(a) {...// Omit transaction message-related processing logic
    long currentTimeMs = time.milliseconds();
    // Create a request to send to the Kafka cluster
    long pollTimeout = sendProducerData(currentTimeMs);
    // Where network IO is actually executed, the request above is sent out and the response received is processed
    client.poll(pollTimeout, currentTimeMs);
}
Copy the code

Before we dive into the runOnce() method, let’s explain the basic components involved and how they work together, as shown below:

SendProducerData () method

The sender.sendProducerData () method is at the heart of the Sender thread’s request creation, and it looks like this:

  1. The Sender thread first queries the data buffered in RecordAccumulator to see which topi-Partitions it can currently send messages to.
  2. The Sender thread then uses NetworkClient to get the connections between the current client and each Node to further filter which nodes to create requests for.
  3. The corresponding ClientRequest request is then generated.
  4. Finally, the networkClient.send () method is called to write the ClientRequest request to NetWorkClient, and then NetWorkClient performs network IO.

The above description is only a rough request sending process, let’s dive into the specific implementation for detailed analysis, first look at the sendProducerData() method, the following is a detailed flow chart:

ProducerMetadata fetch Kafka Cluster metadata

Recordaccumulator.ready () is an accumulator. For example, if you are an Accumulator, you are an accumulator. If you are an Accumulator, you are an Accumulator.

3. In the ReadyCheckResult returned in Step 2, if unknownLeadersExist is not empty, there are unknown topics or leaders in the message to be sent. The metadata. requestUpdate method is called to update needFullUpdate, indicating that the Kafka cluster Metadata needs to be updated. (Just marking it does not block the operation of updating metadata.)

ReadyNodes (networkClient.ready ()) {networkClient.ready ();

  • Check whether KafkaProducer has established a network connection with the target Node. If not, it attempts to initialize the network connection. If initialization fails, false is returned indicating that it is not appropriate to send requests to the Node.
  • The second is to check whether the number of requests currently sent but not responded to has reached the upper limit. If there are many such requests, it may be that the broker is down or not able to handle them properly.
  • In addition to network checks, kafka metadata is checked to see if it needs to be updated, and if so, requests cannot be sent. After all, sending data with expired or incorrect metadata will not succeed.

5. After the above filtering, nodes that are not suitable for sending requests are removed from the readyNodes collection.

Call recordAccumulator.drain () to get a set of messages to be sent. This is a Map

> set, where the Key is the target NodeId. Value is the ProducerBatch set sent to the destination Node.
,>

7. Call the addToInflightBatches() method to send records of the ProducerBatch batches that are ready to be sent in Step 6 into the inFlightBatches collection, which records the ProducerBatch batches that are sent but have not responded to an Batches batch.

8, there’s a small details, here will check guaranteeMessageOrder fields, it has to do with Max. In the flight. Requests. Per. The connection configuration. Can be seen from the name, Max. In. Flight. Requests. Per. The connection is used to control each network connection used to inflight (sending but not response) request number. If this configuration is set to 1, the guaranteeMessageOrder is true. If the relay happens that after the Sender thread sends a request, The target partition is added to the recordAccumulator.fraternal collection, and data to the recordintensive () and drain() methods are ignored later. When the request is returned, KafkaProducer drops the related partitions from the fraternal set, allowing data to be transmitted to the target partitions.

9, the next call getExpiredInflightBatches () method and expiredBatches () method, including:

  • GetExpiredInflightBatches () method to obtain inFlightBatches set ProducerBatch has expired in the set.

  • The expiredBatches() method is used to obtain ProducerBatch collections that are obsolete in RecordAccumulator.

    When these two methods identify ProducerBatch objects that are obsolete, they directly delete them from RecordAccumulator (they do not delete them in inFlightBatches) and record them in the returned result set. If not expired ProducerBatch, call the maybeUpdateNextBatchExpiryTime () method updates RecordAccumulator. NextBatchExpiryTimeMs fields, This field records the ProducerBatch expiration timestamp that has recently expired.

    Finally, the expiration time specified by the delivery.timeout.ms configuration item represents the expiration time after kafkaproducer.send () is called. It should be greater than or equal to the sum of request.timeout.ms and Ling.ms configurations. Request.timeout. ms is the maximum time NetworkClient waits for a response. Ms is the maximum length of time a message is cached in KafkaProducer.

10. After receiving the ProducerBatch set returned in Step 9, the Sender thread loops through the failBatch() method to handle the ProducerBatch objects that have timed out. The ProducerBatch () method is called in the failBatch() method to complete ProducerBatch (that is, update the status of ProducerBatch and trigger the Callback for all the records in it).

  • If done() returns true (that is, the current thread is the first finalState to modify the ProducerBatch batch) and ProducerBatch is ProducerBatch in an inFlightBatches set, Attempts to delete are made here. This is done because there may be concurrent responses that delete data from an inFlightBatches batch batch simultaneously.

PollTimeout = networkClient.poll (); pollTimeout = ProducerBatch (); pollTimeout = networkClient.poll (); The Sender thread is required to perform Step 10 for processing, so the Sender thread cannot block on the poll() method for long. The implementation of NetworkClient is described in detail later.

12, call the Sender. SendProduceRequests () method will each group ProducerBatch (according to the target Node grouping) encapsulated into the corresponding ClientRequest request (through NetworkClient. NewClientRequest () method to create, notify the registered callback to handle the response).

13. Finally, call networkClient.send () to send ClientRequest.

That’s it for the sendProducerData() method, which includes a lot of detail and configuration, but to recap:

  • Steps 1 to 8 determine the ProducerBatch set to be sent based on the Kafka cluster metadata, KafkaProducer network status, and KafkaProducer configuration.
  • Steps 9 to 10 process ProducerBatch objects that are expired in inFlightBatches and RecordAccumulator, and trigger Callback of expired records.
  • Step 11 to 13 Yes Encapsulate the ProducerBatch set to be sent as ClientRequest and send it to NetworkClient.

It’s not too complicated, right? I hope you guys have a good comb.

ClientRequest

At the end of the sender.sendProducerData () method, the ProducerBatch to be sent is wrapped as a ClientRequest request, How can the ProducerBatch collection be converted into ClientRequest requests? What is the format of the ClientRequest request? Let’s take a look at these questions.

We first came to the Sender. SendProduceRequests () method, which will cycle call sendProduceRequest () method handles ProducerBatch set sent to each Node. In the sendProduceRequest() method:

  • ProducerBatch is classified according to TopicPartition dimension to get Map<TopicPartition, ProducerBatch> set (recordsByPartition variable). This collection is used when the response is received. In constructing the recordsByPartition collection, will also construct a ProduceRequestData. TopicProduceDataCollection right, The data is also organized in topic->partition->MemoryRecords format.

    MemoryRecords here is MemoryRecordsBuilder. The build () method created, both the underlying reuse the same ByteBuffer. In the build() method, the MemoryRecordsBuilder turns off appendStream, The RecordBatch Header is also written to the underlying ByteBuffer Header (see kafka Message format from the previous lesson), and the Pointer to the ByteBuffer is updated to read mode. This logic implementation in MemoryRecordsBuilder. Close () method.

  • The ProduceRequest.Builder object is then created to create the ProduceRequest request, which is the actual request sent to the broker in the following format:

The fields in the Request Header are defined as follows. This Request Header is the common Request Header for all requests in the Kafka Protocol:

api_key short The API logo
request_api_key short The API logo
request_api_version short API version number
correlation_id int The serial number, which is generated by the client and monotonically increasing, will be returned to the client in Response without any modification by the server
client_id String Client ID, which can be null

The fields in Produce Request have the following meanings:

The field names type Field meaning
transactional_id String Transaction id, null if not transaction message
acks short How many replicas need to successfully replicate the request message before the server responds to the request. Possible values are:

0 means KafkaProducer does not care about the request response,

1 indicates that the response is at least stored by the leader Replica

-1 indicates that replication is complete for the entire ISR
timeout_ms int The maximum time to wait for a response
topic_data Data sent to each topic
name String The name of the topic
partition_data Data sent to each partition
index int Partition number
records Record collection

Additional requests for Kafka Protocol can be found in this document, which is currently Version 9.

  • Finally, call NetworkClient. NewClientRequest () method to create ClientRequest request, and send to NetworkClient (call the send () method).

Selector

In the previous section, we learned how to convert data in RecordAccumulator to ClientRequest. The following figure shows the core components that NetworkClient relies on:

Here we see a Selector class, note that this Selector is not within the JDK Java, nio. Channels. The Selector, But org.apache.kafka.common.net work. The Selector (here, in order to distinguish the org.apache.kafka.common.net work. The Selector called KSelect).

KSelector implements the Selectable interface. The underlying JDK Selector (THE Selector field) implements asynchronous network I/O operations. KSelector supports client-side applications, so there is no complex multithreading, where only a single thread is used to manage network I/O operations on multiple channels. All network connections handled in KSelector are maintained in a Map

set, where the Key is NodeId, the Value is a KafkaChannel object, It represents the current network connection between KafkaProducer and the corresponding Node.
,>

KafkaChannel relies on SocketChannel to read and write data. NetworkSend and NetworkReceive are the real data of read and write operations. The bottom layer is implemented by ByteBuffer. The Send field in KafkaChannel records the NetworkSend object (request data) that is to be sent temporarily, and the Receive field records the NetworkReceive object (response data) that is to be processed temporarily. TransportLayer encapsulates SocketChannel and SelectionKey. TransportLayer provides different policy implementations based on different network protocols. For example, PlaintextTransportLayer is a common network connection. SslTransportLayer is a network connection that uses SSL encryption.

Creating a Network Connection

Now that we know the data structure of KSelector, let’s look at its core method. The first choice is the connect() method, which, as the name suggests, is used to create a connection in a network. It’s a very typical NIO operation.

public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
    ensureNotRegistered(id); // Check whether the connection to Node is repeated
    // Create a SocketChannel object
    SocketChannel socketChannel = SocketChannel.open();
    // Set SocketChannel to non-blocking mode and keeplive to true.
    // Specify the size of SO_RCVBUF and SO_SNDBUF buffers
    configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);
    // Since it is non-blocking, the socketchannel.connect () method is called to initiate a connection,
    // The connect method may return before the connection is formally established, later via channel.finishConnect()
    // Verify that the connection is actually established
    boolean connected = doConnect(socketChannel, address);
    // Register this socketChannel with the nioSelector, and pay attention to the OP_CONNECT event, which returns the SelectionKey with the associated KafkaChannel object attached
    SelectionKey key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);

    if (connected) {
        // If connected is immediately established, connected is true and OP_CONNECT will not be triggered again
        immediatelyConnectedKeys.add(key);
        key.interestOps(0); }}Copy the code

Data preparation

After creating the KafkaChannel network connection, it’s time to send the data. KSelector. Send (). After receiving data from RecordAccumulator and making ClientRequest, we call NetworkClient.send(). The networkClient.send () method actually calls the KSelector. Send () method.

public void send(NetworkSend send) {
    // Get the requested target NodeId
    String connectionId = send.destinationId();
    // Find the KafkaChannel corresponding to the target Node from channels. If KafkaChannel is closed,
    // from the set of channels to the set of closingChannels in the kseler.close () method,
    // At this point, data can only be read from KafkaChannel that is being closed. No data can be sent
    KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
    if (closingChannels.containsKey(connectionId)) {
        this.failedSends.add(connectionId); // Record sending failed
    } else {
        // Send NetworkSend to KafkaChannel for processing
        channel.setSend(send);
        // If setSend() fails, the current Channel will be closed}}public void setSend(NetworkSend send) {
    if (this.send ! =null) // Only one NetworkSend object can be temporarily stored at a time
        throw new IllegalStateException("...");
    // Record the data to be sent (NetworkSend) and wait for the Channel to write
    this.send = send; 
    // Add the OP_WRITE event to the collection of concerned events
    this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
Copy the code

The networkClient.send () method passed in a ClientRequest object. How did you get a NetworkSend object in KSelector. Send ()? The networkClient.dosend () method is used to perform a series of conversions:

private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
    String destination = clientRequest.destination();
    // Create a request header, which is the Kafka Protocol common request header we described earlier
    RequestHeader header = clientRequest.makeHeader(request.version());
    // Convert ProduceRequest to Send
    Send send = request.toSend(header);
    // encapsulate ClientRequest, ProduceRequest, RequestHeader and Send into InFlightRequest
    InFlightRequest inFlightRequest = new InFlightRequest(
            clientRequest,
            header,
            isInternalRequest,
            request,
            send,
            now);
    // Record inFlightRequests to the inFlightRequests collection, which indicates that the request was sent but no response was received
    this.inFlightRequests.add(inFlightRequest);
    // add a layer of NetworkSend to KSelector. Send ()
    selector.send(new NetworkSend(clientRequest.destination(), send));
}
Copy the code

AbstractRequest = AbstractRequest = ProduceRequest; AbstractRequest = ProduceRequest; Its toSend() method will eventually call sendBuilder.buildsend () :

private static Send buildSend(Message header, short headerVersion, Message apiMessage, 
short apiVersion) {
    ObjectSerializationCache serializationCache = new ObjectSerializationCache();
    // Calculate the total length of the request
    MessageSizeAccumulator messageSize = new MessageSizeAccumulator();
    header.addSize(messageSize, serializationCache, headerVersion);
    apiMessage.addSize(messageSize, serializationCache, apiVersion);
    // Create the SendBuilder object
    SendBuilder builder = new SendBuilder(messageSize.sizeExcludingZeroCopy() + 4);
    // Write the total request length to SendBuilder
    builder.writeInt(messageSize.totalSize());
    // Write the request header
    header.write(builder, serializationCache, headerVersion);
    // Write the request body
    apiMessage.write(builder, serializationCache, apiVersion);
    return builder.build();
}
Copy the code

We can see that SendBuilder can write data. Yes, it also maintains a ByteBuffer underneath, but it also maintains a List

set, The buffer field is used to write basic types of data such as length and Header headers, and the List

collection is used to directly reuse Bytebuffers in MemoryRecords, also known as “zero-copy”. It’s just reuse ByteBuffer. Here’s an implementation of the sendBuilder.writerecords () method:

As we can see from the figure above, the buffer in SendBuilder is just a tool buffer. After writing simple type data, it can be cut into many segments by slice() method and then add appropriate buffers. Next appended is the large ByteBuffer in MemoryRecords that stores the payload. Write data related implementations can refer to the write*() method, especially the writeRecords() method for writing MemoryRecords.

Finally, the sendBuilder.build () method first calls the flushPendingSend() method to encapsulate the Buffers collection as a ByteBufferSend object and add it to the buffer collection. The build() method then checks the buffer buffer. If there is only one Send object in the buffer buffer buffer buffer buffer buffer buffer buffer buffer buffer buffer buffer buffer buffer buffer buffer buffer buffer buffer buffer buffer buffer buffer buffer buffer buffer buffer buffer buffer buffer buffer buffer It returns them wrapped as a MultiRecordsSend object. As mentioned earlier, the Send object returned by build() is wrapped with a layer of NetworkSend, which is then handed to kselect.send () and waiting to be sent.

Read and write data

Once the data is ready, it’s time to perform actual network IO, implemented in the KSelector. Poll () method, where KSelector calls nioSelector. Select () to wait for an IO event to occur. Here is the poll() method’s core logic:

1. The clear() method is first called to clean up all the state information of the last poll() method call in preparation for this call.

2. When we receive the response, we need to consume memory, and the data readFrom the response is naturally allocated into bytebuffers (networkreceive.readfrom ()), which are requested from MemoryPool.

  • When KafkaChannel reads the response, the MemoryPool attempts to obtain a ByteBuffer large enough to store the response data. If not, KafkaChannel is set to mute. The OP_READ event is no longer concerned, that is, the response data is no longer read.
  • Here (step 2 of the KSelector. Select () method) checks the current MemoryPool state and updates the muteState of all KafkaChannel if the MemoryPool has enough space. And start paying attention to OP_READ events on all KafkaChannel so that all KafkaChannel can continue to read the response data.

To say a little more about MemoryPool, the following diagram shows the implementation classes of the MemoryPool interface:

  • The SimpleMemoryPool implementation simply maintains an AtomicLong to record the current number of available bytes (the availableMemory field). There is no pre-allocated ByteBuffer space. The tryAllocate() method creates a new ByteBuffer directly, and the release() method simply increments availableMemory, and then GC retrieves ByteBuffer.
  • GarbageCollectedMemoryPool is a subclass of SimpleMemoryPool, It uses WeakReference + ReferenceQueue to listen for ByteBuffer events that are collected by GC (along with SimpleMemoryPool) BufferToBeReturned (), bufferToberelcall ()) to avoid a MemoryPool memory leak if the user does not call the Release () call.
  • BatchMemoryPool is similar to the previous BufferPool, but much simpler. It only caches fixed size ByteBuffers (specified by the batchSize field). Bytebuffers released by release() are also temporarily stored in the BatchMemoryPool. Free collection, which also takes only batchSize bytebuffers.
  • MemoryPool.NONE This is a MemoryPool implementation with no size limitation.

3. Calculate the timeout of the nioSelector. Select () method. The immediatelyConnectedKeys set will be immediatelyConnectedKeys, and if the immediatelyConnectedKeys is not empty, timeout will be set to 0 and the selectNow() method will be called without any blocking.

if(! immediatelyConnectedKeys.isEmpty() || (madeReadProgressLastCall && dataInBuffers)) timeout =0; // Change timeout
Copy the code

The timeout passed in by KSelector. Poll () takes many things into account, and is computed in the last piece of code in the sender.sendProducerData () method and in the networkClient.poll () method:

  • When there is data to send, timeout of poll() is set to 0 to send data as soon as possible.
  • If There is some data in RecordAccumulator but the sending condition is not Ready, timeout is the timeout period of the message.
  • If RecordAccumulator has no data, timeout is the kafka metadata expiration time.

In the KSelector. Poll () method, the immediatelyConnectedKeys set is the immediatelyConnectedKeys set when the connect() method creates a network connection. Now that the connection is established, we need to move fast

4. Call the nioSelector. Select (timeout) method (or selectNow() method) to wait for the network I/O event.

if (timeoutMs == 0L) // If timeout is 0, it will not block
    return this.nioSelector.selectNow();
else
    return this.nioSelector.select(timeoutMs);
Copy the code

PollSelectionKeys () is used to process the set of selectionKeys obtained in Step 5.

7. Execute the pollSelectionKeys() method again to handle the immediatelyConnectedKeys set.

8. Run the IdleExpiryManager() method to close KafkaChannel that is free for a long time.

After a series of analysis above, KSelector. PollSelectionKeys () method is the core of the process I/O operations, in simple terms, is doing three things: processing, processing OP_ OP_ CONNECT READ, processing OP_WRITE events. Here is the simplified code:

void pollSelectionKeys(Set<SelectionKey> selectionKeys,boolean isImmediatelyConnected, long currentTimeNanos) {
    // All network I/O events received were iterated through
    for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
        // Get KafkaChannel from attach of SelectionKey
        KafkaChannel channel = channel(key);
        String nodeId = channel.id();
        if(idleExpiryManager ! =null) // Update connection operation time to prevent shutdown due to idle
            idleExpiryManager.update(nodeId, currentTimeNanos);

        if (isImmediatelyConnected || key.isConnectable()) {
            // If it is a new connection, call finishConnect() to update the network IO event of interest,
            // The OP_CONNECT event will be removed and the OP_READ event will be focused
            if (channel.finishConnect()) { 
                this.connected.add(nodeId);
                SocketChannel socketChannel = (SocketChannel) key.channel();
            } else {
                continue; }}...// If the network connection has been established, but SSL handshake and authentication have not been performed, the operation will be performed here

        if (channel.ready() && channel.state() == ChannelState.NOT_CONNECTED)
            channel.state(ChannelState.READY); // Update the KafkaChannel status

        if (channel.ready()
            	&& (key.isReadable() || channel.hasBytesBuffered()) // An OP_READ event was received
            	&& !hasCompletedReceive(channel) // Whether there is an unprocessed response
              && !explicitlyMutedChannels.contains(channel)) { // Check whether the Channel is in a fraternal state
            attemptRead(channel); // Read data
        }

        longnowNanos = channelStartTimeNanos ! =0 ? channelStartTimeNanos : currentTimeNanos;
        try {
            attemptWrite(key, channel, nowNanos); // Send the request
        } catch (Exception e) {
            sendFailed = true;
            throwe; }}}Copy the code

In the attemptRead() method, the read() method in KafkaChannel is called to read data, and the read NetworkReceive object is recorded to the KafkaChannel. Receive field. When data is readFrom a connection, the response header is read, which encapsulates the message length, a ByteBuffer of the appropriate size is created (see networkreceipt.readfrom () method), and the message body is read. At the end of the read, the atpread () method HI calls the addToCompletedReceives() method to record the mapping between KafkaChannel and NetworkReceive object into the completedReceives collection.

In the attemptWrite() method, the NetworkSend data in the Send field is sent by calling the Write() method in KafkaChannel, and once sent, the send field is null, Facilitate the arrival of the next NetworkSend object. In addition, successful NetworkSend objects are recorded in the completedSends collection, waiting for further processing. AttemptRead () ¶ In a step more than the attemptRead() method here, we attempattemptread () all attention to the OP_WRITE event on the KafkaChannel, after all, the data is sent.

NetworkClient

Now that we’ve basically covered NetworkClient’s entry and underlying dependencies, let’s return to NetworkClient itself.

Core data structure

ClusterConnectionStates

First, the state of all KafkaChannel in NetworkClient is maintained in ClusterConnectionStates using the Map

collection. Where the Key is NodeId and value is the NodeConnectionState object. NodeConnectionState not only records the ConnectionState (the ConnectionState enumeration), but also the timestamp of the last attempted connection. In the ready() method described earlier, ClusterConnectionStates is used to determine connection status and decide whether to try to reconnect.
,>

InFlightRequests

InFlightRequests are another key field in NetworkClient. The main function of inFlightRequests queues is to cache requests that have been sent but do not receive quests. InFlightRequests bottom is through a Map < String, Deque < NetworkClient. InFlightRequest > > collection, of which the Key is NodeId, Value is the set of requests sent to the corresponding Node. NetworkClient. InFlightRequest recorded the request headers, request body, the association of the Send object associated with the request, the callback, and so on a series of content.

Here have a look at InFlightRequests emphatically. CanSendMore () method, The NetworkClient.ready() method relies on this method and the ClusterConnectionStates connection status to determine whether a request can be sent to a Node. This is further determined with the return value of recordAccumulator.ready () :

public boolean canSendMore(String node) {
    // Get the InFlightRequest collection corresponding to the target Node
    Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
    return queue == null || queue.isEmpty() ||   // No InFlight requests are sent
            // Check whether the first request of the current queue header has been sent. If the request of the current queue header has not been sent, there may be a network problem.
      			// You cannot continue sending requests to this Node. In addition, the first queue request is actually the KafkaChannel. Send field pointing to the request, through the preceding
      			KafkaChannel. Send = KafkaChannel. Send = KafkaChannel
      			// The completed request is overwritten by subsequent requests.
            (queue.peekFirst().send.completed() 
						// Determine whether the InFlightRequests queue piles up too many requests. Possible causes are network problems and high production speed
             // If the broker cluster is unable to handle the request, it should stop sending requests.
            && queue.size() < this.maxInFlightRequestsPerConnection);
}
Copy the code

DefaultMetadataUpdater

The metadataUpdater field (DefaultMetadataUpdater type) in NetworkClient is responsible for updating Metadata (Kafka cluster Metadata).

DefaultMetadataUpdater. MaybeUpdate () method is used to judge whether the current Metadata needs to be updated:

1, * * * * will first through the Metadata in timeToNextUpdate () method to check the Metadata of needFullUpdate, needPartialUpdate two tags, if these two tags is true, Indicates that Metadata needs to be updated immediately. If neither value is true, the next update interval is calculated based on metadataExpireMs and network refreshBackoffMs.

** select * from inProgress where Metadata update requests have been sent. ** Select * from inProgress where Metadata update requests have been sent. If the request has not been sent, the result of Step 1 is returned. If the request has been sent, the timeout duration is returned.

If the Metadata needs to be updated immediately after the preceding two steps, the Metadata needs to be updated immediately.

- Select a Node with the least load using the leastLoadedNode() method. Metadata update requests will be sent to this Node. The method of leastLoadedNode() to determine the load of each Node is to check the length of the InFightRequest queue corresponding to each Node. The smaller the queue length is, the smaller the load of each Node is. - Perform maybeUpdate(Long,Node) overload to send Metadata update requests (MetadataRequest) to selected nodes. Specific way to send is to call sendInternalMetadataRequest () method, its underlying core logic is calling described above NetworkClient doSend () method, The MetadataRequest is encapsulated as a Send object and set to the KafkaChannel. Send field to be sent. The request is also added to the InFlightRequests collection. - After the Metadata update request is sent, information about the requested version is recorded in the inProgress field.Copy the code

Now that we know the Metadata update logic, we need to take a step further and look at the specific format of MetadataRequest:

The field names The field type The field
topic_id String Topic unique identifier
topic_name String The name of the topic
allow_auto_topic_creation boolean Whether to automatically create topics that do not exist
include_topic_authorized_operations boolean Whether the authentication operation is included

Create MetadataRequest request is in the Metadata. NewMetadataRequestAndVersion () method: If the part is update request (needPartialUpdate = true), then only will ProducerMetadata. NewTopics added to the MetadataRequest; If it is a full update request (needFullUpdate = true), producermetadata. topics will be added to MetadataRequest. The operations on the two topic collections in ProducerMetadata have been described previously and will not be repeated here.

After analyzing the logic of the DefaultMetadataUpdater sending the MetadataRequest, let’s look at its handleSuccessfulResponse() method, which handles the MetadataResponse response. The MetadataResponse format is as follows:

Kafka Protocol search for Metadata Response (Version: 11).

Below to DefaultMetadataUpdater. HandleSuccessfulResponse () method, you can tell from the name it is processing MetadataResponse, The bottom is called Metadata. HandleMetadataResponse () method, the call stack is shown below:

In the Metadata. HandleMetadataResponse () method parses MetadataResponse, and eventually fill into MetadataCache, the core code snippet below:

if (isPartialUpdate) // During partial updates, the original MetadataCache mergeWith() method is called to merge the new MetadataCache object with the new MetadataCache object
    return this.cache.mergeWith(metadataResponse.clusterId(), nodes, partitions, unauthorizedTopics, invalidTopics, internalTopics, metadataResponse.controller(), (topic, isInternal) -> ! topics.contains(topic) && retainTopic(topic, isInternal, nowMs));else // If it is a full update, the MetadataCache object is created directly to record the latest metadata
    return new MetadataCache(metadataResponse.clusterId(), nodes, partitions,
        unauthorizedTopics, invalidTopics, internalTopics, metadataResponse.controller());
Copy the code

After Metadata is updated, Metadata also:

  • Reset needPartialUpdate, needFullUpdate and other flag fields
  • Update updateVersion, lastRefreshMs, lastSuccessfulRefreshMs, and other version numbers and timestamps
  • Trigger the clusterResourceListeners listener.

Finally, DefaultMetadataUpdater will empty the inProgress field, indicating that the entire MetadataResponse process is complete.

Core method

Now that you know the core data structures in NetworkClient, let’s look at the core methods of NetworkClient.

First, look at its ready() method, whose core logic was analyzed in detail in step 4 of the sender.sendProducerData () method and won’t be repeated here.

The networkClient.dosend () method was introduced in the Selector data preparation section and will not be repeated here.

Poll () method, the core of which is to call KSelector. Poll () method. The core logic of KSelector. In addition, networkClient.poll () calls multiple handle*() methods to handle completed requests, read responses, and so on.

handleCompletedSends

First look back at the KSelector.attemptwrite () method, which records successfully sent Send objects to the completedSends collection, The handleCompletedSends() method is called immediately after the kselect.poll () method, and is used to handle requests that were sent successfully in the last poll() method. In addition, we know that InFlightRequests recorded sending but not responding, and the last request added to the completedSends collection is shown in the following figure:

In the handleCompletedSends() method, the completedSends set is iterated. If the first request of the corresponding InFlightRequest set does not need a response, it is directly terminated and added to the responses set:

private void handleCompletedSends(List<ClientResponse> responses, long now) {
    for (NetworkSend send : this.selector.completedSends()) { 
    		// Look at the InFlightRequest queue sent by poll()
        InFlightRequest request = this.inFlightRequests.lastSent(send.destinationId());
        if(! request.expectResponse) {// The request does not require a response
		        // Remove the request from the InFlightRequests collection
            this.inFlightRequests.completeLastSent(send.destinationId()); 
 		        // Create the corresponding response to the request and add it to the Responses collection
            responses.add(request.completed(null, now)); }}}Copy the code

handleCompletedReceives

The attemptRead() method introduced earlier records NetworkReceive objects into a completedReceives collection after reading them, NetworkClient. HandleCompleteReceives () method will traverse completedReceives queue, deleted from the InFlightRequests requests, and then according to the response types are classified:

  • If it is MetadataResponse by calling DefaultMetadataUpdater. HandleSuccessfulResponse update MetadataCache () method.
  • If is ApiVersionsResponse (corresponding ApiVersionsRequest, used to obtain kafka cluster protocol version of each API), is to handleApiVersionsResponse () method of the processing, Update NetworkClient. ApiVersions collection (including maintenance NodeId – ApiKey – the mapping relationship between ApiVersion). When the networkClient.dosend () method creates a request, apiVersions determine the protocol versions of different nodes to create the same version of the request.
  • If it is another response, the response is encapsulated as ClientResponse and added to the Responses set for subsequent processing.

handleDisconnections

In the networkClient.send () method described earlier, and when KafkaChannel is closed, the corresponding NodeId is logged into the disconnected collection. The handleDisconnections() method iterates through the disconnected collection, empties the corresponding queues in InFlightRequests, and creates ClientResponse for each request and adds it to the Responses collection. The ClientResponse created here identifies the disconnected flag, that is, the response was caused by a closed connection or a network problem.

If the MetadataRequest is encountered, the state fields such as inProgress in DefaultMetadataUpdater are cleaned first. NeedFullUpdate is then set to true again via the handleServerDisconnect() method, indicating that the Kafka cluster metadata needs to be updated.

handleConnections

In KSelector. PollSelectionKeys () to deal with the new connection, in addition to invoking finishConnect OP_READ event to the attention of a set () method, corresponding NodeId will also added to the collection of connected. The handleConnections() method here changes the connection state in the ConnectionStates to Connected based on the Connected collection.

handleTimedOutRequests

HandleTimedOutRequests () is a simple method that iterates over the InFlightRequests collection to determine whether Kafka has quests that time out. Later quests follow the same logic as handleDisconnections() and the code does not post.

handleTimedOutConnections

HandleTimedOutConnections () method will check ClusterConnectionStates, to establish a connection timeout NodeId, close the connection directly, and then walk handleDisconnections handle () method.

handleInitiateApiVersionRequests

In handleConnections () method of dealing with the new connection at the same time, NodeId will be added to the nodesNeedingApiVersionsFetch collection. In handleInitiateApiVersionRequests () method of traverse nodesNeedingApiVersionsFetch collections, and call the doSend send ApiVersionsRequest request () method, The response is handled in the handleCompletedReceives() method described above.

completeResponses

After the processing of the above series of handle*() methods, all ClientResponse generated in networkClient.poll () method has been collected into responses set. The Responses collection is iterated through in the completeResponses() method, calling the callback (RequestCompletionHandler type) in each corresponding ClientRequest.

RequestCompletionHandler binding can review with ClientRequest Sender. SendProduceRequest () created in the logic of the ClientRequest, Used here RequestCompletionHandler object is an anonymous objects, the actual callback logic in Sender. HandleProduceResponse () method.

The handleProduceResponse() method calls the completeBatch() method to handle both normal and exception responses, The Sender.completeBatch() method classifieserror codes in Response.error:

private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,long now) {
    Errors error = response.error;
    if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 && !batch.isDone() &&
            (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) { // ---(3)
        // Omit the code and logs associated with transaction messages
        // Error code MESSAGE_TOO_LARGE indicates that ProducerBatch is too large. RecordAccumulator is shred and rewritten
      	// Retry
        this.accumulator.splitAndReenqueue(batch);
        // Delete ProducerBatch that failed to be sent from InFlightRequests
        maybeRemoveAndDeallocateBatch(batch);
    } else if(error ! = Errors.NONE) {if (canRetry(batch, response, now)) { // ---- (2)
            // An exception occurs, but the maximum number of retries has not been reached and the message has not timed out.
            // ProducerBatch is rewritten to the head of the RecordAccumulator queue via reenqueueBatch()
            reenqueueBatch(batch, now);
        } else if (error == Errors.DUPLICATE_SEQUENCE_NUMBER) {
            completeBatch(batch, response); // Serial number release problem
        } else {
            // Call failBatch() directly for other exceptions that cannot be handled
            failBatch(batch, response, exception, batch.attempts() < this.retries);
        }
        if (error.exception() instanceof InvalidMetadataException) {
            // If the Metadata is abnormal, set needFullUpdate to continue updating Metadatametadata.requestUpdate(); }}else {
        // Handle normal response -- (1)
        completeBatch(batch, response);
    }
    if (guaranteeMessageOrder) // In the scenario of sending a single message, the sending restriction is released and the subsequent ProducerBatch is prepared to be sent
        this.accumulator.unmutePartition(batch.topicPartition);
}
Copy the code

First, look at the branch at (1), which handles normal or no response cases, The producerBatch.done () method is executed to change the producerBatch.finalState state (CAS operation) and is triggered by finding Callback on each Record through the Tunks collection. Delete ProducerBatch from RecordAccumulator and release the underlying ByteBuffer.

Looking at the branch at (2), the canRetry() method checks the following conditions to decide whether to retry:

  • ProducerBatch whether the product has timed out (deliveryTimeoutMs).
  • ProducerBatch Specifies whether the number of retries reaches the upper limit.
  • Whether the status of ProducerBatch is incomplete.

After deciding to retry, ProducerBatch is added to the head of the corresponding queue in RecordAccumulator in the reenqueueBatch() method, It is also removed from InFlightRequests (which NetworkClient will add back later).

Finally to see (3) the branch, it is used to deal with the problem of much bigger ProducerBatch in front is introduced in the process of writing Record, has already passed MemoryRecordsBuilder. HasRoomFor () method to determine whether there is enough space ah, Why is it possible to have a large ProducerBatch that can’t be processed? This is mainly because the cause of the compression algorithm, we see estimatedBytesWritten () method, which estimates the number of bytes written, multiplied by a estimatedCompressionRatio (compression (forecast), it is the starting value is 1. During data preparation, ProducerBatch objects are not only exported from recordAccumulator.drain () but also set to read-only status by calling their close() method. At this point, the compression ratio of the ProducerBatch will be calculated, and the call stack is shown as follows:

CompressionRatioEstimator maintained in each topic under different compression algorithm of compression ratio (COMPRESSION_RATIO fields, the ConcurrentMap < String, float [] >). The calculated compression ratio here will be less than one. Subsequent MemoryRecordsBuilder, will get the latest from CompressionRatioEstimator compression ratio to carry on the forecast, related to the call stack is as follows:

The relevant code snippet in the ProducerBatch constructor is as follows:

public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long createdMs, boolean isSplitBatch) {...// Initialize other fields
    // Get the compression rate of the specified compression algorithm under the target topic
    float compressionRatioEstimation = CompressionRatioEstimator.estimation(topicPartition.topic(),
        recordsBuilder.compressionType());
  	// Update the MemoryRecordsBuilder record compression rate based on which the MemoryRecordsBuilder estimates are calculated
    recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
}
Copy the code

After knowing the cause of the oversized ProducerBatch, we return to the splitAndReenqueue() method at (3) :

public int splitAndReenqueue(ProducerBatch bigBatch) {
    // Reset the compression ratio
    CompressionRatioEstimator.setEstimation(bigBatch.topicPartition.topic(), compression,
                      Math.max(1.0 f, (float) bigBatch.compressionRatio()));
    // Large ProducerBatch is partitioned according to batchSize. This creates multiple small ProducerBatch objects
    Deque<ProducerBatch> dq = bigBatch.split(this.batchSize);
    int numSplitBatches = dq.size();
    // Add the shard ProducerBatch sequentially to the queue head of the target Deque
      
        and send it as soon as possible
      
    Deque<ProducerBatch> partitionDequeue = getOrCreateDeque(bigBatch.topicPartition);
    while(! dq.isEmpty()) { ProducerBatch batch = dq.pollLast(); incomplete.add(batch);// Record the ProducerBatch that is not sent
        synchronized (partitionDequeue) {
            ... // Omit transaction-related processing logic
            partitionDequeue.addFirst(batch); // Add to queue leader}}return numSplitBatches;
}
Copy the code

conclusion

The update of this class focuses on the introduction of Sender thread in KafkaProducer, detailed analysis of Sender, KSelector, NetworkClient and other core class data structure and methods. KafkaProducer sends requests and processes responses.

In the next class, we will introduce Kafka Consumer.

Article will be updated to WeChat public number: Yang, four is the original address: xxxlxy2008. Making. IO/kafka/E3%5%…