Welcome to github.com/hsfxuebao/j… , hope to help you, if you feel ok, please click on the Star

Kafka is a Producer that sends network requests to the broker. Look at the Sender thread run method, analyzing steps five, seven, and eight.

{/** * Step 5: ** We may send many partitions, * it is possible that some of the leader partitions are on the same server. P0 :leader: 0 * p1:leader: 0 * p2:leader: 1 * p3:leader: 2 * Assume that our cluster has only 3 servers * When the number of our partitions is greater than the number of nodes in the cluster, there must be multiple leader partitions on the same server. * 0:{p0, P1} * 1:{p2} * 2:{p3} * Map<Integer, List<RecordBatch>> Batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); // If (guaranteeMessageOrder) {// Mute all the partitions fight for (List<RecordBatch> batchList: Values () {// If they are just an empty label, this code is not executed. for (RecordBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); }} /** * Step 6: * How is the batch timed out handled? */ List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now); // update sensors for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); sensors.updateProduceRequestMetrics(batches); // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes // with sendable data that aren't ready to send since they would cause busy looping. long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (! result.readyNodes.isEmpty()) { log.trace("Nodes with data ready to send: {}", result.readyNodes); pollTimeout = 0; } /** * Step 7: * Create a request to send a message * Create a request * When we send a message to a partition, some of the partitions are on the same server *, if we send our network request partition by partition, the network request will be a little more frequent * we need to know, The network resources in our cluster are very precious. * Combines data sent to partitions above the same broker into a single request. * Then send it all at once, which reduces network requests. */ // If the network connection is not set up properly, they are actually empty. // This code will not execute. sendProduceRequests(batches, now); // if some partitions are already ready to be sent, the select time would be 0; // otherwise if some partition already has some data accumulated but not ready yet, // the select time will be the time difference between now and its linger expiry time; // otherwise the select time will be the time difference between now and the metadata expiry time; /** * NetWorkClient is a component that performs all network operations. This component includes sending requests and receiving responses (processing responses). this.client.poll(pollTimeout, now); }Copy the code

drain()

/** * TopicPartition -> Deque<RecordBatch> convert TopicPartition -> Deque<RecordBatch> convert TopicPartition -> List<RecordBatch> List<RecordBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) { if (nodes.isEmpty()) return Collections.emptyMap(); Map<Integer, List<RecordBatch>> Batches = new HashMap<>(); for (Node node : nodes) { int size = 0; / / get the partition set on the Node List < PartitionInfo > parts. = cluster partitionsForNode (Node. The id ()); List<RecordBatch> Ready = new ArrayList<>(); /* to make starvation less likely this loop doesn't start at 0 */ int start = drainIndex = drainIndex % parts.size(); PartitionInfo part = parts.get(drainIndex); TopicPartition TopicPartition tp = new TopicPartition(part.topic(), part.partition()); // Only proceed if the partition has no in-flight batches. Contains (TP)) {// obtain corresponding RecordBatch queue Deque<RecordBatch> Deque = getDeque(new TopicPartition(part.topic(), conservation. part.partition())); if (deque ! = null) {synchronized (deque) {RecordBatch RecordBatch first = deque.peekfirst (); if (first ! Boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now; // Only drain the batch if it is not during backoff period. if (! backoff) { if (size + first.sizeInBytes() > maxSize && ! ready.isEmpty()) { // there is a rare case that a single batch size is larger than the request size due // to compression; In this case we will still eventually send this batch in a single // request // } else {// Get the first RecordBatch RecordBatch Batch = deque.pollFirst(); // Disable Compressor and the underlying output stream, and set MemoryRecords to read-only batch.close(); size += batch.sizeInBytes(); ready.add(batch); batch.drainedMs = now; }}}}}} // Update drainIndex this.drainIndex = (this.drainIndex + 1) % parts.size(); } while (start ! = drainIndex); // Record the mapping between NodeId and RecordBatch. Put (Node.id (), ready); } return batches; }Copy the code

TopicPartition -> Deque to NodeId -> List.

sendProduceRequests()

private void sendProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) { for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet()) sendProduceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue()); } private void sendProduceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) { Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size()); final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<>(batches.size()); for (RecordBatch batch : batches) { TopicPartition tp = batch.topicPartition; produceRecordsByPartition.put(tp, batch.records()); recordsByPartition.put(tp, batch); } ProduceRequest.Builder requestBuilder = new ProduceRequest.Builder(acks, timeout, produceRecordsByPartition); RequestCompletionHandler callback = new RequestCompletionHandler() { public void onComplete(ClientResponse response) { HandleProduceResponse (Response, recordsByPartition, time.milliseconds())); }}; String nodeId = Integer.toString(destination); ClientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks! = 0, callback); client.send(clientRequest, now); log.trace("Sent produce request to {}: {}", nodeId, requestBuilder); } public class ProduceRequest extends AbstractRequest { public static class Builder extends AbstractRequest.Builder<ProduceRequest> { private final short acks; private final int timeout; private final Map<TopicPartition, MemoryRecords> partitionRecords; public Builder(short acks, int timeout, Map<TopicPartition, MemoryRecords> partitionRecords) { super(ApiKeys.PRODUCE); this.acks = acks; this.timeout = timeout; this.partitionRecords = partitionRecords; }}}Copy the code

Encapsulate ClientRequest request parameters, which need to contain the corresponding request header, API key, API version, acks, request timeout, and then the request body, which contains the corresponding batch of data. Finally, It must have been a binary byte array of all that stuff

ClientRequest encapsulates the assembled data in accordance with the format of binary protocol. Many topics are sent to the broker, and each Topic has many partitions. Each Partitioin corresponds to a batch of data

client.send(clientRequest, now)

Public void send(ClientRequest Request, long now) {//TODO looks like the key code. doSend(request, false, now); } private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) { String nodeId = clientRequest.destination(); // omit. Send send = request.toSend(nodeId, header); InFlightRequest inFlightRequest = new InFlightRequest( header, clientRequest.createdTimeMs(), clientRequest.destination(), clientRequest.callback(), clientRequest.expectResponse(), isInternalRequest, send, now); // Save Request requests to the inFlightRequests component here. // Stores requests that have not received a response. // A maximum of 5 requests can be stored in this file by default. If the partition order is guaranteed, it can only be set to 1 // in fact, we can guess one thing, if our request is sent // and successfully received the response, it will later be removed here. this.inFlightRequests.add(inFlightRequest); // TODO selector.send(inFlightRequest.send); }Copy the code

Adding requests to inFlightRequests stores requests that have not yet received responses. By default, there can be up to 5 requests. If the partition order is guaranteed, set this parameter only to 1. In fact, we can guess one thing, if our request is sent. And then successfully received the response, and I’m going to go over here and remove the request.

selector.send(inFlightRequest.send)

public void send(Send send) { String connectionId = send.destination(); if (closingChannels.containsKey(connectionId)) this.failedSends.add(connectionId); Else {// Get a KafakChannel KafkaChannel channel = channelOrFail(connectionId, false); Try {/** * cache the send object in KafkaChannel send field, and add OP_WRITE event attention * Send object type is RequestSend object, which encapsulates the specific request data. This includes both the header and the body of the request. * This simply records the RequestSend object in KafkaChannel's Send field. * The actual sending is done in the Selector  */ channel.setSend(send); } catch (CancelledKeyException e) { this.failedSends.add(connectionId); close(channel, false); } } } public void setSend(Send send) { if (this.send ! = null) throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress."); // Bind an outgoing request to KafkaChannel. this.send = send; // Here comes the key code // there is an OP_WRITE event bound. // Once the event is bound, we can send the request to the server. this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); }Copy the code

PollSelectionKeys = pollSelectionKeys; pollSelectionKeys = pollSelectionKeys; pollSelectionKeys = pollSelectionKeys; pollSelectionKeys = pollSelectionKeys;

private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) { Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); KafkaChannel KafkaChannel channel = channel(key); // register all per-connection metrics at once sensors.maybeRegisterConnectionMetrics(channel.id()); if (idleExpiryManager ! = null) idleExpiryManager.update(channel.id(), currentTimeNanos); Try {// omit. /* If channel is ready to write to any sockets that have space in their buffer and for which we have data */ / // Selector registers an OP_WRITE if (channel.ready() &&key.isWritable ()) {// Gets the network request we want to send. // This code is to send data to the server. OP_WRITE Send Send = channel.write(); OP_WRITE Send = channel.write(); // The response message has been sent. = null) { this.completedSends.add(send); this.sensors.recordBytesSent(channel.id(), send.size()); } } /* cancel any defunct sockets */ if (! key.isValid()) close(channel, true); } catch (Exception e) { String desc = channel.socketDescription(); if (e instanceof IOException) log.debug("Connection with {} disconnected", desc, e); else log.warn("Unexpected error from {}; closing connection", desc, e); close(channel, true); }}}Copy the code

The next channel. The write ()

public Send write() throws IOException { Send result = null; If (send! = null && send(send)) { result = send; send = null; } return result; } private Boolean send(send send) throws IOException {send.writeTo(transportLayer); // If the network request has been sent. If (send.com pleted ()) / / and then remove the OP_WRITE transportLayer. RemoveInterestOps (SelectionKey. OP_WRITE); return send.completed(); }Copy the code

It sends the request to the server and unlistens for selectionkey. OP_WRITE.

Reference Documents:

Kafka 0.10.2.0-src kafka 0.10.2.0-src kafka 0.10.2.0-src

Kafka technology insider – Graphic details kafka source code design and implementation

Kafka source code analysis series