Kafka provides a custom network protocol that can be implemented in any language to push messages to and pull messages from kafka clusters. In Kafka 2.8.0, the clients module is the default Java implementation of producer and Consumer. In kafka 2.8.0, we focus on the Java implementation of producer and consumer.

Kafka Producer example demonstration

Kafka Producer kafka Producer kafka Producer kafka Producer kafka Producer kafka Producer kafka Producer

public class ProducerDemo {
    public static void main(String[] args) throws Exception {
        Properties config = new Properties();
        config.put("client.id"."ProducerDemo");
        // Specify the address of the Kafka Broker cluster
        config.put("bootstrap.servers"."localhost:9092");
        // How many replicas need to copy the message successfully before the Kafka cluster response is configured. All indicates that the entire ISR set is replicated
        config.put("acks"."all");
        // Specifies the serializer for message key and value, which serializes KV into byte arrays
        config.put("key.serializer", StringSerializer.class);
        config.put("value.serializer", StringSerializer.class);
        KafkaProducer<String, String> producer = new KafkaProducer<>(config);

        for (int i = 0; i < 10; i++) {
            // Value of the message
            long startTime = System.currentTimeMillis();
            // Construct the ProducerRecord object, which records the topic to which the message is targeted, as well as the key and value
            ProducerRecord<String, String> record =
                    new ProducerRecord<>("test_topic", String.valueOf(i), "YangSizheng_" + startTime);

            // The second argument is an anonymous CallBack object. When producer receives an ACK message from the Kafka cluster,
            // Its onCompletion() method is called to complete the callback
            Future<RecordMetadata> future = producer.send(record, new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if(e ! =null)
                        System.out.println("Send failed for record:" + record + ", error message:"+ e.getMessage()); }});// The send() method sends a message asynchronously and returns a Future object. If you want to send synchronously, you can call its get() method,
            // The RecordMetadata returned contains the partition to which the message fell and the allocated offset
            RecordMetadata recordMetadata = future.get();
            System.out.println("partition:" + recordMetadata.partition()
                    + ", offset:"+ recordMetadata.offset()); }}}Copy the code

Before executing ProducerDemo, we start the consumer command by executing kafka-console-consumer.sh:

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic
Copy the code

Then execute ProducerDemo and see the following output on the console:

inkafka-console-consumer.shThe following output is displayed on the command line:

Overview of kafka Producer Architecture

After understanding the basic use of Kafka Producer, we started to introduce the architecture of Kafka Producer in depth. A picture of Kafka Producer’s core architecture is shown below.

Here is a description of the core components involved in the figure above. There are two threads involved, one is our business thread (the main thread in the figure) and the other is the Sender thread. Let’s do it one by one. The first is the main thread logic:

  1. First, the ProducerInterceptors filter or modify the message.
  2. Serializer is used to serialize the key and value of message.
  3. The Partitioner selects the appropriate partition for the message based on certain policies.
  4. RecordAccumulator is an object with multiple queues, which can be regarded as a buffer for message. Used to batch send messages.

Let’s look at the logic of the Sender thread:

  1. The Sender thread gets the message data in bulk from RecordAccumulator to construct the ClientRequest.
  2. Send the constructed ClientRequest to the NetworkClient client.
  3. NetworkClient The client puts the request into KafkaChannel’s cache.
  4. NetworkClient Performs network I/O and sends requests.
  5. NetworkClient receives the response and invokes ClientRequest’s callback function, which eventually triggers the callback function registered on each message.

The core KafkaProducer. The send ()

Kafkaproducer.send () : kafkaproducer.send () : kafkaproducer.send ()

The following describes the core logic of the kafkaproducer.send () method:

  1. The main thread invokes the first ProducerInterceptors. OnSend () method, to intercept or modify the message. here
  2. The information about the Kafka cluster is then updated through the waitOnMetadata() method, which essentially wakes up the Sender thread to update the Metadata, which holds the Kafka cluster Metadata.
  3. Next, execute the Serializer. Serialize () method to serialize the message key and value.
  4. Partition () is then called to select the appropriate partition for Message.
  5. Call the Append () method to write message to RecordAccumulator for temporary storage.
  6. Finally, the Sender thread wakes up and then sends messages in bulk from RecordAccumulator to the Kafka cluster.

ProducerInterceptor

First, ProducerInterceptors maintain a collection of ProducerInterceptors with onSend(), onAcknowledgement(), and onSendError() methods. Is a method that loops through the ProducerInterceptor collection. We can intercept or modify messages by implementing the ProducerInterceptor onSend() method. The Kafka cluster response can also be preprocessed by implementing the onAcknowledgement() method and onSendError() method prior to the user’s Callback.

Kafka Metadata

When we send messages via KafkaProducer, we specify only the topic to which messages are written, but not the partition to which messages are written.

However, partitions of the same topic may be located on different brokers in Kafka. Therefore, producer needs to clearly know the meta information of all partitions in the topic (IP address, port, etc.). In this way, you can establish a network connection to the broker where the partition is located and send messages.

In KafkaProducer, the Node, TopicPartition, and PartitionInfo classes are used to record Kafka cluster metadata:

  • Node is a Node in a Kafka cluster. It maintains basic information about the Node, such as host, IP address, and port.
  • TopicPartition is used to abstract a partition within a topic, which maintains the topic name and partition number information.
  • PartitionInfo abstracts information about a partition, where:
    • The leader field records the ID of the leader replica node
    • The Replica field records all the nodes where replicas are located
    • The inSyncReplicas field records the information about all replica nodes in the ISR set.

Kafka Producer encapsulates the basic information of the above three dimensions into a Cluster object, which contains the following information:

One level up, the Cluster object is maintained in Metadata, which also maintains Cluster version numbers, expiration dates, listeners, and so on, as shown in the following figure:

After the above analysis, we can get the following schematic diagram:

After the static data structure analysis, we come to KafkaProducer. WaitOnMetadata () method is how it works:

private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long nowMs, long maxWaitMs) throws InterruptedException {
    // Get the Cluster object currently cached by MetadataCache
    Cluster cluster = metadata.fetch();
    if (cluster.invalidTopics().contains(topic))
        throw new InvalidTopicException(topic);
    // Update ProducerMetadata cache
    metadata.add(topic, nowMs);

    // Get the number of partitions for the target topic from the partitionsByTopic collection
    Integer partitionsCount = cluster.partitionCountForTopic(topic);
    // If no metadata for the target topic exists, the ClusterAndWaitTime object is returned directly without the following update operation
    if(partitionsCount ! =null && (partition == null || partition < partitionsCount))
        return new ClusterAndWaitTime(cluster, 0);

    long remainingWaitMs = maxWaitMs;
    long elapsed = 0;
    do {
        // Update ProducerMetadata cache
        metadata.add(topic, nowMs + elapsed);
        // Update gets the current updateVersion and sets the appropriate identifier to trigger metadata updates as soon as possible
        int version = metadata.requestUpdateForTopic(topic);
        // Wake up the Sender thread to update the metadata
        sender.wakeup();
        try {
            If the updated version is larger than the current version, an exception will be thrown if the updated version times out
            metadata.awaitUpdate(version, remainingWaitMs);
        } catch (TimeoutException ex) {
            throw newTimeoutException(...) ; } cluster = metadata.fetch();// Get the latest Cluster
        elapsed = time.milliseconds() - nowMs;
        if (elapsed >= maxWaitMs) {
            throw new TimeoutException(partitionsCount == null ?
                    String.format("Topic %s not present in metadata after %d ms.",
                            topic, maxWaitMs) :
                    String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
                            partition, topic, partitionsCount, maxWaitMs));
        }
        metadata.maybeThrowExceptionForTopic(topic);
        remainingWaitMs = maxWaitMs - elapsed; // Calculate the metadata update time
        partitionsCount = cluster.partitionCountForTopic(topic); // Get the number of partitions
    } while (partitionsCount == null|| (partition ! =null && partition >= partitionsCount));

    return new ClusterAndWaitTime(cluster, elapsed);
}
Copy the code

Here’s how to update the metadata, and we’ll look at it in detail when we introduce the Sender thread workflow.

The serializer

The communication between nodes in distributed system inevitably involves the conversion between memory object and byte stream, that is, serialization and deserialization.

The Serializer interface in Kafka is Serializer, which converts objects into byte arrays. The Deserializer is the Deserializer interface that converts a byte array into an in-memory object.

The Serializer and Deserializer interface implementation classes are shown below:

As you can see from the above figure, Kafka comes with the Serializer and Deserializer implementations for common Java types. You can also customize the Serializer and Deserializer implementations to handle complex types.

The following uses The StringSerializer implementation as an example to illustrate the core implementation of Serializer:

  1. Configure () method is the serialized configuration before operation, for example, in StringSerializer. Configure () method will select the appropriate coding type (encoding), default is utf-8
  2. The serializer() method is where the serialization actually takes place, serializing the incoming Java object into byte[].

Partition to choose

After the waitOnMetadata() method gets the latest cluster metadata, it’s time to determine which partition to send the message to.

If we specify the destination partition explicitly, the user-specified one prevails, but generally the business does not specify the partition to which the message needs to be written, so the Partitioner uses the cluster metadata to calculate a destination partition.

The following figure shows the full implementation of the Partitioner interface:

As you can see from the name, DefaultPartitioner is the default implementation, where the partition() method:

  1. If message has a key, it takes the hash value of the key (using murmur2’s high-efficiency and low-collision hash algorithm) and modulates the total number of partitions to obtain the destination partition number. This ensures that messages with the same key go to the same partition.
  2. If there is no key message, by StickyPartitionCache. Partition () method to calculate the target partition.

Here’s what StickyPartitionCache does. RecordAccumulator is a buffer. The message sent by the main thread is called RecordAccumulator. And then the Sender thread sends it in batches when it has enough messages.

There are two main conditions that trigger the Sender thread to bulk send heap messages:

  1. The delay time of message has reached. In other words, our business scenario requires message sending to be delayed, and messages cannot be cached on the producer end all the time. The message delivery delay can be reduced through linger. Ms configuration.
  2. Messages pile up enough to reach a certain threshold for bulk delivery, so the payload is high. Batch. size the default value is 16KB.

StickyPartitionCache basically implements “sticky selection”, where you try to send a message to a partition first, so that the buffer that you send to that partition fills up quickly, so that the message delivery delay is reduced. There is no need to worry about unbalanced partition data because messages will be sent evenly to each partition as long as the service runs for a long time.

Now look at the StickyPartitionCache implementation, which maintains a ConcurrentMap(indexCache field), key is topic, and value is which partition is currently stuck to.

In the partition() method, StickyPartitionCache first fetches stuck partitions from the indexCache field, and if none exists, it calls the nextPartition() method to write one to the indexCache. In the nextPartition() method, the available partitions in the target topic are first fetched and one of them is randomly selected to write to indexCache.

Finally, students may ask, when do you update the stuck partition? Take a look at the kafkaproducer.dosend () method.

// Try to append a message to RecordAccumulator
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
            serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
// Because the current batch of the target partition does not have space, you need to replace another partition and try again
if (result.abortForNewBatch) {
    int prevPartition = partition;
    // Replace the target partition and the partition stuck to the StickyPartitionCache
    partitioner.onNewBatch(record.topic(), cluster, prevPartition);
    // Calculate the new destination partition
    partition = partition(record, serializedKey, serializedValue, cluster);
    tp = new TopicPartition(record.topic(), partition);
    interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
    // Call the append() method again to write message to RecordAccumulator.
    // A new batch is created and will not be tried again
    result = accumulator.append(tp, timestamp, serializedKey,
        serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
}
Copy the code

Recordaccumulator.append ()

UniformStickyPartitioner also relies on StickyPartitionCache to implement stickiness.

The RoundRobinPartitioner implements the RoundRobinPartitioner, which, as its name indicates, computes the target partition according to the strategy of the round. It also maintains a ConcurrentMap set (topicCounterMap). Where key is the name of the topic and value is an incrementing AtomicInteger.

In RoundRobinPartitioner. Partition () method, the total number of partition will be to find the target topic, and then from the above AtomicInteger values and with the total number of partition modulus, Get the number of the destination partition.

conclusion

In this class, we first introduced the basic use of KafkaProducer, then introduced the core architecture of KafkaProducer, and finally introduced the core operation of the main thread in kafkaproducer.send () method.

In the next class, we will start to introduce KafkaProducer RecordAccumulator.

Articles and videos from this class will also be posted on:

  • Wechat official account: Yang Sizheng
  • B station search: Yang Sizheng