Introduction to the

Kafka sends messages in the process, there is a point to understand before viewing the source code, that is kafka in the producer design everywhere revealed batch processing logic as long as master this point I believe that view the source code will solve a lot of puzzles

Producer architecture

This is a schema of a Kafka producer. There are two main threads: the user’s main thread, and the sender thread that Kafka sends messages to

  • 1. When a user calls Kafka Producer’s Send method to send a message, it first passes through a series of interceptors for data processing. These Intercepotors are defined by the user
  • 2. The data processed by the interceptor will be serialized by KafkaProducer’s serializer. Users can implement their own serialization and deserialization logic by passing in the corresponding serialization classes when KafkaProducer objects are created
  • 3. Each topic in Kafka corresponds to multiple partitions to send data after the serialization of the data to select the appropriate partition, which is used to select the partition. Users can also implement their own partition logic, and can also specify to send data to a fixed partition
  • 4. After processing the data, the main thread calls the RecordAccumulator append method to add the message to the queue corresponding to the corresponding partition and wait for the Sender thread to obtain the data and send it in bulk
  • 5. The Sender thread gets the message from the RecordAccumulator object
  • 6. The sender thread creates a ClientRequest
  • 7. Send the client request to NetworkClient
  • 8.NetworkClient sends the request to KafkaChannel to prepare the network IO
  • 9 Perform network I/O and receive data from the server
  • 10 Received a response callback ClientRequest
  • This user needs to implement the callback interface to implement the callback function

Now we’re going to take a closer look at each of these steps. What does Kafka do

Sending process

Producer Analyzes the main thread sending process of the user

1.1 onSend ()

After calling the send method, Kafka first calls the Interceptor onSend method to perform a uniform intercepting process

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record); Return doSend(interceptedRecord, callback); }Copy the code

1.3 The doSend method will be called after the execution of the interceptor (part of the code will be omitted because the fundamental analysis does not affect the reading of the exception directly instead of the exception itself).

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; Try {// Wait for metadata ClusterAndWaitTime ClusterAndWaitTime; Try {/**1.3.1 Waiting for the metadata update is mainly waiting for the partition metadata update of the topic to be sent and the logic for updating the metadata is mainly in the sender thread */ clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs); } catch (KafkaException e) { throw exception() } Cluster cluster = clusterAndWaitTime.cluster; byte[] serializedKey; SerializedKey = keySerializer. Serialize (Record.topic (), record.headers(), record.key())); } catch (ClassCastException cce) { throw exception(); } byte[] serializedValue; Try {/ / 1.3.3 serializing the value serializedValue = valueSerializer. Serialize (record. The topic (), record the headers (), record. The value ()); } catch (ClassCastException cce) { throw exception(); Int partition = partition(Record, serializedKey, serializedValue, cluster); Tp = new TopicPartition(record.topic(), partition); SetReadOnly (record.headers()); Header[] headers = record.headers().toArray(); int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), compressionType, serializedKey, serializedValue, headers); // Check whether the size of the message is greater than maxRequestSizey and totalMemorySize ensureValidRecordSize(serializedSize); long timestamp = record.timestamp() == null ? nowMs : record.timestamp(); if (log.isTraceEnabled()) { log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); } // interceptCallback = new InterceptorCallback<>(Callback, this.interceptors, tp); / / omit things / / 1.3.5 additional send information RecordAccumulator. RecordAppendResult result = accumulator. Append (tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs); AbortForNewBatch {int prevPartition = partition; // if (result.abortForNewBatch) {int prevPartition = partition; partitioner.onNewBatch(record.topic(), cluster, prevPartition); partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); if (log.isTraceEnabled()) { log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition); } // producer callback will make sure to call both 'callback' and interceptor callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs); } if (transactionManager ! = null && transactionManager.isTransactional()) transactionManager.maybeAddPartitionToTransaction(tp); if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future; } // omit exception handling logic}Copy the code
1.3.1 waitOnMetadata() Waits for metadata updates
Kafka metadata consists of topic server which nodes correspond to partitions on which nodes the partition copy is on which node the leader of the partition is on which nodeCopy the code

Here are three classes

  • The Node class represents the server(Broker) entity ID: ID of the Broker idString: ID of the broker Host: IP address of the server POST: port number of the server

  • TopicPartition:topic Indicates the partition information. Entity Partition: indicates the partition ID. Topic: indicates the topic

  • PartitionInfo: TopicPartition mapping entity topic: Leader: indicates the node where the leader resides. Replicas: indicates the node where all replicas reside. InSyncReplicas: indicates the node where the fully synchronous replica resides (also known as the ISR replica)

Through these three classes we can describe the metadata information that Kafka needs. This information is stored in theCluster.classIn this class

We all know that all network I/O is actually done by the sender thread, The user main thread basically calls requestUpdate() to set the needUpdate field and when the sender thread gets up and checks the changed field it knows it’s going to update the metadata set and then updates the version number and then the main thread checks until the version number changes and exits and waits for it to continue

Public synchronized int requestUpdateForTopic(String topic) {if (newTopics. Contains (topic)) Set the needPartialUpdate = true return requestUpdateForNewTopics (); } else {needFullUpdate = true return requestUpdate(); }}Copy the code

The main thread blocks waiting code

public synchronized void awaitUpdate(final int lastVersion, final long timeoutMs) throws InterruptedException { long currentTimeMs = time.milliseconds(); long deadlineMs = currentTimeMs + timeoutMs < 0 ? Long.MAX_VALUE : currentTimeMs + timeoutMs; time.waitObject(this, () -> { // Throw fatal exceptions, if there are any. Recoverable topic errors will be handled by the caller. maybeThrowFatalException(); / / version number as the basis of whether the update to complete the return updateVersion () > lastVersion | | isClosed (); }, deadlineMs); if (isClosed()) throw new KafkaException("Requested metadata update after close"); } @Override public void waitObject(Object obj, Supplier<Boolean> condition, Long deadlineMs) throws InterruptedException {synchronized (obj) {while (true) {if (condition.get())) // Returns if the condition is true return; long currentTimeMs = milliseconds(); if (currentTimeMs >= deadlineMs) throw new TimeoutException("Condition not satisfied before deadline"); Obj. Wait (deadlinems-currentTimems); }}}Copy the code
1.3.2, 1.3.3 Serialize keys and values
1.3.4 Selecting a divider

Select a partition for topic. If the user has specified a partition, use the user’s partition first. Look at the default partition (if the key is not empty kafka will send messages with the same key to the same partition.

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, Int numPartitions) {if (keyBytes == null) {// Use sticky partitions if the key is empty stickyPartitionCache.partition(topic, cluster); } return utils.topositive (utils.murmur2 (keyBytes)) % numPartitions; }Copy the code
1.3.5 Append the message to Accumulator via the Append method and wait for the Sender to send it. The append method is described in the next section
1.3.6 If a new batchRecord needs to be created, it will select a new partition. If the message has a key, it will still be sent to the same partition. If the message does not have a key, it will trigger the sticky partition, which will avoid the hunger effect of some partitions