preface

If it is the first time to see this article, it is suggested to make up the previous 5 foundations, will be very helpful to your understanding

Episode: Kafka in plain English

Episode: Kafka producer principle and important parameter description

Episode: Analysis of Kafka’s producer case and consumer principles

Episode: Kafka running process summary and source preparation

Kafka source preheat – Java NIO

Pigeon from the beginning of the last year for a long time of source code, and finally to the whole thing. In fact, on the one hand, I was afraid that I could not tidy up well, and it was not interesting, so I still spent some time to prepare, but also hoped to make progress with everyone. Note that this article is very long, and it is recommended to watch it with the right navigation on the PC side for better effect. Ok! Without further ado, let’s begin!

Second, the core initialization process of Producer

Import the source code, here you need to have a period of time to download dependencies, guide you can see the entire source code structure is like this

It would be very confusing if you had to do it class by class, so use scenario-driven. Coincidentally, I didn’t even have to write the scene. See an example package in the source code? Most big data frameworks are open source. In order to promote them, the official documentation should be written in detail and some good sample packages should be provided.

2.1 The source code comes with the producer.java example

If we look at producer.java, we can see that in its constructor, this code is somewhat familiar, even very familiar

/ * *

Initialize the producer object

 *

 * @param topic

 * @param isAsync

* /


public Producer(String topic, Boolean isAsync) {



    // Create a configuration file

    Properties props = new Properties();



    // Pull kafka metadata

    props.put("bootstrap.servers"."localhost:9092");



    // This parameter is ignored for now (client.id is used to manage permissions)

    props.put("client.id"."DemoProducer");



    // Set the serialization class for key and value

    props.put("key.serializer"."org.apache.kafka.common.serialization.IntegerSerializer");

    props.put("value.serializer"."org.apache.kafka.common.serialization.StringSerializer");



    // Initialize a KafkaProducer with the arguments above

    producer = new KafkaProducer<>(props);

    this.topic = topic;

    this.isAsync = isAsync;

}

Copy the code

We did more configuration when we were simulating producers because we had more tuning parameters. This is just the most basic.

Add a note about “pulling Kafka metadata.

Put (“bootstrap.servers”, “localhost:9092”); If in doubt, here are two more sentences. The metadata of each broker in the cluster is consistent. The localhost:9092 I specified is not necessarily the leader partition. When I get the cluster metadata, I know where the leader partition is. Then I knew where to send it through the leader. So, the purpose of specifying an address (which can be multiple) is to find a broker and get metadata to know where the leader is.


The run method used as a scenario driver

This is followed by a run method that simulates the incoming data

2.2 Kafka initialization Method

If we focus on this initialization method and see what it’s doing, we’ll jump to kafkaproducer.java, which is near line 188

To find Kafka’s constructor, click this

2.3 Constructors for Kafka (Principle analysis)

At this point, let’s put aside the source code and draw a schematic diagram. This whole process is the same as we did in the analysis of producers.

Kafka in the running process summary and source code before preparation has also been mentioned, if you do not know this piece of friends, you can jump to the two reading, sort out the ideas

2.3.1 Operations before Throwing it into the buffer

First of all, we’re now initializing a KafkaProducer, right? Then there is the ProducerInterceptors, which filter our messages according to certain rules. But this thing doesn’t really work, because I can replace it with if-else, so it’s kind of lame. Therefore, it is used to filter a message before sending it and serialize the message after sending it. Once serialization is complete, the Partitioner (which needs to know which partition to send to on which server) is found for partitioning.

So the four key words that we have here are

2.3.2 Buffer structure

RecordAccumulator is an accumulator with multiple deque queues. Kafka is not an accumulator with multiple deque queues. Instead, it will be packaged and sent in batches (each batch defaults to 16K). The packaged message batches in these queues are sent to the different partitions in turn (only 1,2, and 3 are shown), as shown in the figure below

The first deque is only sent to partition 1, the second deque is only sent to partition 2… and so on

2.3.3 Sender thread Structure

The Sender thread is actually sending the data, as shown below

When the Sender starts up, it’s going to create a ClientRequest, and the ClientRequest here is not exactly the same. Because there should be different requests to different servers. After the creation request is complete, it is sent to NetWorkClient, which is a very important component that manages the Kafka network. It will hold requests in it, but we’ll explain why this is needed later.

KafkaChannel in a selector is basically a SocketChannel that we talked about in NIO, and then the selector sends a message to Kafka, and the process is that the client sends a message to the server, and then the server, Kafka, returns a response, The response is also received by the KafkaChannel and returned to NetworkClient, which is then processed and returned to the client.

2.3.4 General schematic analysis diagram

So the whole process should look like this. The flow chart has been marked with numbers 1 to 12, of course you can add another

13.NetworkClientReturns the result to the client

Copy the code

This figure is also very very rough a process description, Kafka source details are much more detailed than this figure, so we see here if you feel it is normal, combined with the source code must be more clear.

2.4 Constructor source code

To put the source code in perspective, what we’re talking about is the first step in the diagram we just drew, the initialization of KafkaProducer. The source code is very long, so we’re going to go through it as a short cut. Back to kafkaproducer.java

2.4.1 Configuring User-defined Parameters (Non-important)

2.4.2 clientId (Non-key)

2.4.3 Metric (Non-key)

Metric is the monitoring aspect, not the logical part we care about

2.4.4 partition editor

As we mentioned at the time, we can give each message a key, or not specify it, depending on which topic and which partition we want to send the message to. And that’s what partitioners are designed to do. In case you forgot, this is an excerpt from an earlier article, which is part of the analysis of the Producer case and consumer principle for Kafka

So it is highly recommended that you can read the previous several basic articles, I believe that you will understand these operations to help a lot.

2.4.5 Retry Time (Non-critical)

If you know this parameter, you can click on it and see the default value, but you can just say that the default value is 100 milliseconds

2.4.6 Serializer (non-important)


Those are the two that we started with

2.4.7 Interceptor (non-key)

2.4.8 Metadata of Metadata units

The next four parameters will be mentioned separately

The first parameter
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, clusterResourceListeners);

Copy the code

The METADATA_MAX_AGE_CONFIG parameter, which defaults to 5 minutes, is used to retrieve metadata information from the cluster every 5 minutes. Because in order to send a message we have to make sure that the metadata information is accurate.

Second parameter
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);

Copy the code

The MAX_REQUEST_SIZE_CONFIG parameter represents the maximum size of a message specified by the producer when it is sent to the server. If you exceed this size, your message will not be sent. The default is 1M, which is a bit too small and needs to be changed in production. For example, 10M, of course, according to local conditions, we need to consider the actual situation of the company to decide.

The third parameter
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);

Copy the code

The BUFFER_MEMORY_CONFIG parameter refers to the buffer, or RecordAccumulator size. This value is generally sufficient and the default is 32 MB

The fourth parameter
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));

Copy the code

The parameter COMPRESSION_TYPE_CONFIG does not support compression by default, but it can be set. Options include None, gzip, SNappy, and LZ4, which we usually use. These can be viewed in the source code. I’m not going to click in here.

After compression, more messages are sent at a time. The natural throughput is up, but it will cause a certain burden on the CPU. Please think carefully before using it.

2.4.9 Initializing the buffer according to the parameters previously provided

2.4.10 Obtaining the address of metadata information in a cluster

The parameter BOOTSTRAP_SERVERS_CONFIG is the same as the demo code we wrote earlier

props.put("bootstrap.servers","hadoop1: 9092.hadoop2: 9092.hadoop3: 9092 ");

Copy the code

BOOTSTRAP_SERVERS_CONFIG is this “hadoop2 hadoop1:9092:9092, hadoop3:9092”, its role is to indicate the direction to the producers to retrieve the metadata in the cluster.

Version 2.4.11 update

This looks like the address is passed in as a parameter, like a method to get or update metadata information. Let’s verify our guess later

2.4.12 Initializing NetworkClient

There are several parameters to be aware of as well

— ① CONNECTIONS_MAX_IDLE_MS_CONFIG

Maximum idle time for a network connection. After the idle time is exceeded, the connection will be closed automatically. The default value is 9 minutes

Normally we would set it to -1, and when it’s -1, under what circumstances is it not recycled

— ② (Major) MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION

The maximum number of messages that do not receive a response per network connection that sends data. The default value is 5

If the MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION is set to the default value 5, then the producer sends data to each server using a different network connection. If the server does not return a response, we cannot send message 6 again.

Note: Since the retry mechanism in Kafka can cause messages to be out of order, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION is set to 1 to keep messages in order.

For example, our common order system and member points system are very distinct scenes. Orders can only be cancelled after they are created, and corresponding member points must be increased and then decreased. If this order cannot be guaranteed, problems will occur in the system.

So don’t assume that setting a key to our message ensures that messages from the same scene are placed in the same partition will guarantee the order of messages. The MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION parameter is set to 1

— ③ SEND_BUFFER_CONFIG and RECEIVE_BUFFER_CONFIG

Because it wasn’t very important so it got thrown together, just some of NIO’s stuff

SEND_BUFFER_CONFIG refers to the size of the buffer when the socket sends data. Default: 128K (review NIO if you forget)

RECEIVE_BUFFER_CONFIG Specifies the size of the buffer from which the socket receives data. The default value is 32 KB

2.4.13 Initialization process of the Sender thread

The only important parameter is RETRIES_CONFIG which is number of retries, default is no retries, so this is very frustrating, the program is very fragile in this case, just a little bug and it dies. Big data are distributed systems, because some of the network instability, resulting in the whole system down, it is not worth the loss. As I told you, 95% of the problems in the program can be solved by retry

Of course the ACKS_CONFIG parameter is also very important, but we have already talked about the producers in the previous time, do not believe me to give you the screenshots

You see, I’m not gonna lie to you. Also be emphasized again here, the foundation of a few articles in front is useful, had better still can go filling up!

Therefore, ACKS_CONFIG is an important parameter if the interview asks how to ensure that data is not lost. To set it to -1, there is one more parameter to come up later.

2.4.14 Starting the Sender thread

Here you will find that Kafka source code in some details to do quite well, it is the new KafkaThread can be clicked into take a look

It just sets this thread up as a background thread, it doesn’t start directly, it creates a thread and the reason why it passes the Sender in is because it wants to isolate the business code from the thread-related code, and even if later you have to add some parameters to this thread, you just add them in kafkathRead.java. By looking at these small details, you can see that the code is well written.

So far the producer constructor is pretty much there, but we still have the key metadata left unexpanded

2.5 How does Metadata manage Metadata

Let’s go to metadata.java

So let me just briefly describe the parameters here

2.5.1 refreshBackoffMs

Minimum interval between two requests to update metadata, default 100ms. Because the process of requesting metadata is not always successful, and if the metadata information is not requested, we cannot find the Leader partition.

2.5.2 metadataExpireMs

How often is the metadata automatically updated? The default is 5 minutes

2.5.3 version

For the producer side, the metadata has a version number, which will be updated every time the metadata is updated.

2.5.4 lastRefreshMs

The time when metadata was last updated

2.5.5 lastSuccessfulRefreshMs

The time when metadata was last successfully updated

2.5.6 Cluster (Most Important)

Kafka cluster metadata

2.5.7 needUpdate

Whether to update the identity of the metadata

2.5.8 switchable viewer

Represents an existing topic

2.6 Cluster – Metadata in a Kafka Cluster

2.6.1 nodes

Kafka cluster: Kafka cluster: Kafka cluster: Kafka cluster: Kafka cluster: Kafka cluster: Kafka cluster

2.6.2 unauthorizedTopics and internalTopics

Kafka privileges topic, know that there is such a thing can be

2.6.3 Some encapsulated data structures

Not all of these data structures can be used

private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;

private final Map<String.List<PartitionInfo>> partitionsByTopic;

private final Map<String.List<PartitionInfo>> availablePartitionsByTopic;

private final Map<Integer, List<PartitionInfo>> partitionsByNode;

private final Map<Integer, Node> nodesById;

private final ClusterResource clusterResource;

Copy the code

Map partitionsByTopicPartition, representing the partition and its associated information, we can order into PartitionInfo see, for the convenience of watching, I write comments directly

So PartitionInfo is the information that corresponds to this partition

Map> partitionsByTopic represents the partitions for this topic

The Map > availablePartitionsByTopic represents what this topic partition is available

Map> partitionsByNode Which partitions exist on this server (server uses server id)

Private Final Map nodesById Map that records servers and server numbers (numbered from 0)

Private Final ClusterResource ClusterResource Kafka Cluster ID

The Metadata structure is generally understood here. Back to kafkaproducer.java

Now we know that it is the data structures we mentioned above that maintain metadata information

2.6 We assumed it was an update to get the update metadata

Expansion description of 2.4.11

Click on update and pull down to about 204 lines

We can all look at the default values for each of these conditions

Bootstrap (addresses) cluster.bootstrap (addresses) This obviously didn’t do anything, so our first guess was wrong, so the conclusion is:

The producer does not retrieve metadata information during initialization

However, on second thought, when we send messages, we must obtain the cluster metadata to know the existence of the leader in the cluster anyway, so we just need to look for the logic before and after sending messages.

At this point, initialization of the producer is done.

2.7 Information obtained

In the initialization process of KafkaProducer, many important parameters and several core components are initialized, which also leads us to draw the diagram roughly, such as RecordAccumulator, Sender, NetworkClient, And the Sender thread is actually started as soon as it’s initialized. And there’s no pulling metadata during initialization.

Iii. The core process of sending messages

Go back to where the dream began, the example in the source code

“Send” is used to send messages

As you can see, the code is very long and jumps roughly to line 454 of KafkaProducer, starting with the try step, which we already know roughly

Take the first five steps for now

3.1 Pulling metadata

This is perfectly understood by putting comments into Baidu Translation, which uses waitOnMetadata to wait for metadata to be pulled synchronously before sending messages. MaxBlockTimeMs is the maximum amount of time to wait for the pull process, because the code is blocked here while the pull process is going on, so we have to set a time limit for the release.

This calculates a residual time and then updates the metadata in the cluster.

3.2 Serialize the key and value of messages

3.3 Select the partition to which the message should be sent based on the partition divider

int partition = partition(record, serializedKey, serializedValue, cluster);

int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);

Copy the code

Because now that we’ve got the metadata, we can start doing calculations based on the metadata to get the results.

3.4 Verify that the message size exceeds the maximum value

ensureValidRecordSize(serializedSize);

Copy the code

When KafkaProducer is initialized, it specifies a parameter that specifies the maximum size of a message that can be sent from the producer. The default value is 1M

3.5 Encapsulate partition objects based on metadata information

tp = new TopicPartition(record.topic(), partition);

long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();

log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);

Copy the code

3.6 Bind callback functions to messages

Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);

Copy the code

Since we are sending asynchronously, we need the callback function to confirm

3.7 The Message is Stored in RecordAccumulator

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);



if (result.batchIsFull || result.newBatchCreated) {

   log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);

   this.sender.wakeup();

}

    return result.future;

Copy the code

RecordAccumulator is a buffer of 32M by default. After that, we need to package the message into batches to send. If the batch is full, a new batch will be created. Start the Sender thread to send data.

How does waitOnMetadata work

4.1 Store the current topic into metadata

// add topic to metadata topic list if it is not there already and reset expiry

metadata.add(topic);

Copy the code

Adds the topic to the list of metadata topics (if it doesn’t already exist) and resets the expiration time

4.2 the fetch operation

Cluster cluster = metadata.fetch();

Copy the code

Here fetch is to fetch the existing metadata directly from the cache. However, after the analysis we just did, we know that there is no data in this cluster at this time, only the addresses of our parameters are in it. According to our scenario driver, the first time we execute this is exactly when KafkaProducer initialization is complete. At this time, the cluster does not obtain metadata

4.3 Viewing Partition Information

Integer partitionsCount = cluster.partitionCountForTopic(topic);

Copy the code

The partition information is viewed from the cluster based on the current topic in the cluster, but again, there is no data at the first execution, the cluster has nothing

4.4 Returning the metadata information and time

if(partitionsCount ! =null && (partition == null || partition < partitionsCount))

    return new ClusterAndWaitTime(cluster, 0);

Copy the code

Similarly, we would never have gotten to this point because we didn’t get metadata the first time we did it

4.5 Pulling Metadata from the Server

The first execution of 4.2 to 4.4 is a waste of time, and the metadata must be pulled from here. First, three parameters about time are defined

// Record the current time

long begin = time.milliseconds();



// How much time is left. The default value is the maximum waiting time just mentioned

long remainingWaitMs = maxWaitMs;



// Time is spent

long elapsed;

Copy the code

A do… while loop is then used to retrieve the metadata

4.5.1 Problems with version numbers

int version = metadata.requestUpdate();

Copy the code

At this point, our first operation is to obtain the version of the metadata. For producer, metadata has a version number, and the version number must be updated every time the metadata is successfully updated. The requestUpdate method changes the value of needUpdate in 2.6.7 to true and retrieves the version number of the current metadata.

4.5.2 sender. Wakeup ()

Here we see that the Sender thread is also doing the work, actually because the Sender thread is doing the pulling of the metadata, so it wakes up the Sender and waits for the metadata to come in synchronously, which we can see from while (partitionsCount == null)

4.5.3 Trying to Obtain some parameters

If successful, we should have retrieved the metadata, so we can try to retrieve some parameter information

4.5.4 Synchronizing awaitUpdate of Metadata

In 4.5.2 we mentioned synchronous waiting for the arrival of metadata through this method

While we haven’t looked at the Sender thread source code yet, we can guess that the completion of the metadata update will wake up this wait. Most of the other code is code that you can already understand.

4.6 How does Sender pull metadata

Go to sender.java and find the run method

Cluster Cluster = metadata.fetch(); We’ve seen this many, many times, but there’s no metadata and I think you already know that. Yes, the run method is very long, but all the way up to line 236, the first time it executes, without metadata, it doesn’t execute, just the following sentence

The client, if we click on it, is a KafkaClient, and the implementation class for KafkaClient is NetworkClient

To see the logic of the poll method, click on NetworkClient’s poll

4.6.1 Poll method of NetworkClient

We’ll talk about Kafka’s network design later, and if it’s stressful to read here, it’s easy to look back at it later.

— Step 1: Encapsulates a pull metadata request

long metadataTimeout = metadataUpdater.maybeUpdate(now);

Copy the code

Click on maybeUpdate to check it out

If you click on it again, you see a wrapped request, and the next line of code after that request is complete is doSend

If I click on it, it’s a ClientRequest

And then we hit send, and I’m a little dizzy, to be honest, and we jump into Selectable. Java, and we realize that send is an abstract method, implemented as Selector. Java

We’re not going to look at all the parameters here, and I’ll expand on them in the next chapter, but we just want to find the thing that gets the metadata, and we’ll see its send method in about 240 lines

Click setSend to go to the setSend method in KafkaChannel

See, even the selectionKey is exactly the same. If you don’t understand it here, please skip to the NIO non-blocking network communication section for a review

Fall back to networkClient’s poll method

— Step 2: Perform network I/O operations

This part is all NIO knowledge, so I won’t expand it here. Because it’s the kind of problem that isn’t difficult but jumps around, and it’s a little bit of trouble to cut the graph. If you have any questions, feel free to talk, and I’ll skip the NIO steps here. We’ll see a writeTo method later to send the request to the server. That’s enough to know

— Step 3: Receive the response and process it

The above request is sent and we naturally want to receive the response from the server

Point in handleCompletedSends

MaybeHandleCompletedReceive method is to process the response

4.6.2 Processing logic and metadata acquisition of response

MetadataResponse response = new MetadataResponse(body);

Copy the code

Because the server sends back a binary data structure, the producer here parses it and encapsulates it as a MetadataResponse object

Cluster cluster = response.cluster();

Copy the code

The response will contain metadata information, now to fetch the cluster object


If cluster.nodes().size() > 0, you have successfully obtained the metadata object. Update: version=version+=1. The key point is that there is also a notifyAll() method that wakes up the thread that has just been waiting synchronously for the metadata information, causing the code to exit the while loop.

So at this point, it’s a complete process of getting metadata.

finally

It is really written behind their dizziness, this source type of explanation is very difficult, jump to jump, but also hope that we can harvest it, until now, we look at this figure

Even serialization has not started yet, there is a long way to go… 🤣🤣🤣

The next article will be Kafka network design to expand, interested friends can pay attention to (public number: say your wish) oh, feel the article can also be a small thumbs-up, thank you.