1 Overall process

(1) Custom message interceptors, generally not useful. (2) Synchronized waiting, 'pull metadata'. The first topic needs to pull metadata, which is lazy loading idea. Retrieve the information of cluster. Cluster contains information about the cluster, such as topic-broker-partition. (3) Serialize the topic and key and value into a byte[] array. (4) Calculate the key and value from the Partitioner to determine which partition to send it to. (5) Determine the message size to be no larger than the single request limit and buffer size. (6) Binding message callback function and interceptor callback function (7) 'Send message to Accumulator' (8) Welcome sender thread. If batchisFull means a lot isfull, or if there is a new lot, it means there is a lot to send, and it will wake up the sender thread.

2. Metadata acquisition

When sending messages, the producer only knows about the topic, while the metadata is not known at the first time.



First look at the send method of Producer

(1) The first step of DOSend is WaitonMetadata, which is to get the metadata of Topicblocking, until the metadata is obtained.

ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);

(2) Record the topic that needs to be sent in the metadata’s Topic Map.

(3) Get the cluster information from the metadata. If you send the topic information for the first time, you can’t get the metadata. Come back later and you can get it and go straight back.

(4) Set the metadata’s NeedUpdate to true, and record the current metadata version for future comparison.

Wake up the Sender thread and then block awaite itself. awaitUpdate(final int lastVersion, final long maxWaitMs)

“Await” is a simple process which is just a while loop which calculates the amount of time left based on the configured timeout and then waits for either the intermediate sender thread to wake up or it will wake up at that time and see if the version has been updated or not.



Note that throughout the whole process, the producer manages a timeout, calculates the remaining time, and reports the timeout once the time is exceeded.

The sender thread is waiting on the send thread of the Producer. How to wake up the send thread of the Producer? Look at the sender thread: (1) The sender itself is a thread that starts when Kafkaproducer starts, and inside it is a run method in a while loop. (2) this.client. Ready checks whether the connection has been established with the broker, and initiates the connection if not.

Check the connection: connectionStates canConnect (node idString (), now)) initiate connections: initiateConnect (node node, long) some key parameters: / / to connect a non-blocking socketChannel. ConfigureBlocking (false); // KeepAlive for 2 hours, Socket.setKeepAlive (true); // turn off the nagle algorithm, do not send small packets together, reduce the delay socket.settcpnodelay (true);

Since establishing a connection is non-blocking, you initiate the connection here and go, leaving a place to wait for the connection to complete.

(3) Since there is no connection to start, we can skip a lot of process in the middle, directly look at the end of the Sender run,

this.client.poll(pollTimeout, now) -> metadataUpdater.maybeUpdate(now); Here is the encapsulation of a pull metadata request,

In general, we just pull metadata information for the topic we sent, wrap a ClientRequest, and call the dosEnd method in order to place the metadata request on the InflightsRequests queue. And add it to the sending object of Kafkachannel in Kafka’s own encapsulated Selectable. Kafkachannel will only make one request at a time. This component is also used on the server side. If the request is placed in a Kafkachannel, then there must be a Java channel for the next step.

The doSend goes on, and the important part is to focus on the corresponding connectOp_write eventTo:

(4) this.client. Poll (pollTimeout, now)

-> this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));

-> pollSelectKeys

Kafka’s own encapsulated selector handles the various scenarios directly. It handles the different scenarios by distinguishing the events that the selectKey is concerned about. Here we look at the join scenarios first:



With FinneshConnect, wait for the connection to complete (since the previous step initiated the connection as a non-blocking result, it must wait for the connection to complete), while passing the SelectKey through the underlying TransportLayer componentCancel connect event and add op_read event.

Because in step 3, we added the op_write event, so once this connection is complete, we also go to the later write branch, send the enclosed metadata send request through the underlying cannel, and record it in the completedSends, saying that it was sent successfully.



(5) Ideally, after some time, the server returns a response, then it should go to OP_READ logic, read the response data and put it into the stageReceives queue.



(6) Back to the core poll of NetworkClient, the first MaybeUpdate encapsulates the metadata request, and the poll is responsible for sending and receiving the request.

The request and response will be processed in the next step, but I’m only interested in the metadata here, so I’ll just look at it

HandleKremetedsends -> handleResponse -> this.metadata.update(cluster,now),

Note that this is the cluster information update, and most importantlyversioin+1“Can then go back to the awaiteUpdate part of Producer’s send process, because by waiting for the new version over and over again, the new version can actually send the message down. The process of obtaining metadata is also a process of sending a request and receiving a request, which is consistent with the process of sending a message. It is the encapsulation and multi-layer abstraction of NIO, the separation of network components and business components, and the communication through some intermediate queues, which is worth thinking and learning.