preface

HHH 🤣). The last article described a case study to explain how to consider the deployment of a Kafka cluster, as a reference. After all, people working in different companies will have their own implementation plan.

Now let’s go back to the question of reason, and this time we will continue the style of the first chapter, and we will lead you to draw the diagram step by step. Relaxed and happy

Kafka’s Producer principle

First we need to have a cluster, and then there are several servers in the cluster. Each server is called a Broker, which is actually a Kafka process

If you remember from the first article, you can easily guess that there will be a controller, multiple followers, and a ZooKeeper cluster. At the beginning, all our brokers will register with our ZooKeeper cluster.

The Controller then listens for changes to the ZooKeeper cluster and changes its metadata information as the cluster changes. Followers also go to their master controller to synchronize metadata, so that metadata is consistent across all servers in a Kafka cluster.

With the above preparations completed, we officially start our producer content

â‘  Noun 1 — ProducerRecord

Before a producer needs to send a message to the cluster, it wraps each message into a ProducerRecord object, which is done internally by the producer. And then it goes through a serialization process. As mentioned in several previous columns, the data that needs to be transmitted over the network is binary bits of data that need to be serialized for transmission.

We need to send messages to a leader partition in a Topic. How do producers get to which leader partition in a Topic?

For those of you who may have forgotten, a controller can be seen as the broker’s head, managing the metadata of a cluster, while a leader partition is used as a load balancer and is distributed across different servers. Both the production and consumption of data in the cluster are performed against the Leader Partition.

â‘¡ NOUN 2 — partitioner

To determine which leader partition is the leader partition, you only need to obtain the metadata.

It’s not hard to get metadata from any server in the cluster (because every server in the cluster has the same metadata).

â‘¢ NOUN 3 – buffer zone

Instead of rushing to send the message, the producer puts it into a buffer

â‘£ noun 4 — Sender

After putting the message into the buffer, at the same time, there is a separate thread Sender to wrap the message Batch by Batch. It is not difficult to imagine that if Kafka is really a message by message transmission, a message is a network connection, then performance will be poor. Batching is used to improve throughput

After finishing each batch, they start to send it to the corresponding host. It then goes through the Kakfa network design model described in the first article, and then writes to OS cache, and then to disk.

The following is the Kafka network design model that we described at the time

⑤ Producer code

1. Set parameters

// Create config file object Properties = new Properties(); // This parameter is used to obtain the metadata of kafka cluster. // This parameter is used to obtain metadata of Kafka cluster. Put ("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092"); // We can set the key for each message, Role then elaborate props. The put (" key. The serializer ", "org.apache.kafka.com mon. Serialization. StringSerializer"); // Serialize the actual message you're sending from a string to a byte array props. Put ("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // props. Put ("acks", "-1"); props.put("retries", 3); props.put("batch.size", 323840); props.put("linger.ms", 10); props.put("buffer.memory", 33554432); props.put("max.block.ms", 3000);Copy the code

2. Create a producer instance

// Create a Producer instance: KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);Copy the code

3. Create a message

ProducerRecord<String, String> record = new ProducerRecord<>(
				"test-topic", "test-value");
Copy the code

Of course, you can also specify a key, which will be explained later

ProducerRecord<String, String> record = new ProducerRecord<>(
				"test-topic", "test-key", "test-value");
Copy the code

4. Send the MESSAGE

With a callback function that returns a successful message if there is no exception

Send (record, new Callback() {@override public void onCompletion(RecordMetadata metadata, Exception Exception) {if(Exception == null) {// Message sent successfully System.out.println(" Message sent successfully "); } else {// message failed to send, need to send again}}}); Thread.sleep(10 * 1000); // This is the synchronous send mode (which is not used in general, the performance is poor, the test can be used) // You have to wait for the next series of steps to send the message // after the message response is returned to you, you will exit the method.Copy the code

5. Close the connection

producer.close();
Copy the code

Two, dry goods time: tuning part of the code

The part that separates a thoughtful typist from a thoughtful typist is the part of tuning that hasn’t been covered in 1, which is explained separately, as follows

props.put("acks", "-1");
props.put("retries", 3);
props.put("batch.size", 32384);
props.put("linger.ms", 100);
props.put("buffer.memory", 33554432);
props.put("max.block.ms", 3000);
Copy the code

â‘  Acks message authentication

props.put("acks", "-1");
Copy the code
acks Check whether the message is successfully sent
– 1 Leader & All follower receive
1 Leader to receive
0 Send the message

The acks parameter has three values: -1,0,1. These values are used by Kafka to determine whether the message was sent successfully. If acks is -1, the messages are written to the leader partition of the Kafka partition. If acks is -1, the messages are synchronized to the leader partition of the Kafka partition. Println (” message sent successfully “). In this case, the performance of sending data deteriorates

If acks is set to 1, the message to be sent is successfully sent as long as it is written into the leader partition. However, data loss may occur in this mode. For example, the leader partition breaks down immediately after the message is successfully sent to the leader partition. No matter who the remaining followers elect as the leader, the message just sent does not exist.

If acks is set to 0, the message is successfully sent as long as it is sent. I don’t care about anything.

â‘¡ Retries Number of retries (Major)

This parameter is still very important and must be set in a production environment to set the number of times the message is resent

props.put("retries", 3);
Copy the code

In kafka may encounter all kinds of abnormal (which can be directly added to the bottom of the exception type), but neither meet what kind of abnormal, messages are sent at this time there were problems, especially the network suddenly appear problem, but every time can’t exception is thrown, cluster may recover in the next second network, so we’re going to set the retry mechanism.

Here’s a sidebar: After retries is set, 95% of the exceptions in the cluster fly away on their own. I’m serious 🤣

If we need to set the retry interval, we can also set it to retry.backoff.ms. Below, I set 100 milliseconds to retry once, i.e. 0.1 seconds

props.put("retry.backoff.ms",100);
Copy the code

â‘¢ Batch size

The batch size is set to 16K by default, but 32K is used here. Setting a larger batch size can improve throughput slightly. Setting the batch size also depends on the size of the message. Therefore, we need to estimate the size of the messages in the cluster beforehand, which is normally set to several times the size.

props.put("batch.size", 32384);
Copy the code

â‘£ Linger. Ms transmission time limit

For example, if I set the batch size to 32K and one message is 2K, three messages have been sent, with a total size of 6K, and no messages have been sent from the producer side. Should I not send to the cluster when 32K is not enough? Ms is to set a fixed time, even if the Batch is not filled, it will be sent. I set 100 milliseconds below, so even if my Batch does not reach 32K, the Batch will be sent to the cluster after 100 milliseconds.

props.put("linger.ms", 100);
Copy the code

⑤ Buffer. memory Buffer size

When our Sender thread is very slow and we’re producing data very fast, if we don’t have enough buffer in the middle, the producer can’t produce any more data, so we need to make the buffer a little bigger. The default size of the buffer is 32 MB, which is basically reasonable.

props.put("buffer.memory", 33554432);
Copy the code

So how do we verify that we should adjust the buffer size at this point, we can test it using the usual Java end time minus start time, and when the end time minus start time is greater than 100ms, we think that the Sender thread is slow and needs to increase the buffer size.

Of course, in general, we do not need to set this parameter, and 32M is sufficient for the general situation.

Long startTime=System.currentTime(); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception Exception) {if(Exception == null) {// Message sent successfully System.out.println(" Message sent successfully "); } else {// message failed to send, need to send again}}}); Long endTime=System.currentTime(); If(endtime-startTime > 100){// The memory is full, indicating that there is a problemCopy the code

}

⑦ Compression. Type Compression mode

Compression. Type, which defaults to None, does not compress. However, lZ4 compression can also be used

props.put("compression.type", lz4);
Copy the code

Was Max. Block. Ms

Left to the source code, is to set the blocking time of several methods

props.put("max.block.ms", 3000);
Copy the code

⑨ max.request.size Indicates the maximum message size

Max. Request. Size: This parameter is used to control the size of the sent message, the default is 1048576 bytes, is 1 m, the general is too small, a lot of news could be more than 1 MB size, so the need to optimize adjustment, set a larger (enterprises generally set to 10 m), otherwise the program runs well suddenly to a 2 m of the message, The system gives you an error, and you lose more than you gain

props.put("max.request.size", 1048576);    
Copy the code

â‘© Request.timeout. ms The request has timed out

Request. A timeout. Ms: A TimeoutException will be thrown if no response is received after 30 seconds. If the company network is not good, adjust this parameter

props.put("request.timeout.ms", 30000); 
Copy the code

Added: exceptions in Kafka

Whether asynchronous or synchronous, you can handle exceptions. Common exceptions are as follows:

1) LeaderNotAvailableException: if this is a machine hang up and a copy of the leader is not available at this time, will cause the failure of you write, to wait for other followers copy after switching to a copy of the leader, can continue to write, can try again to send at this time. If you restart the broker process of kafka at ordinary times, will surely lead to switch leader, will cause you to write an error, is LeaderNotAvailableException

2) NotControllerException: If the Controller Broker is down, there will be a problem and the Controller needs to be reelected

NetworkException We configured a parameter, retries, that automatically retries, but if it fails after a few tries, an Exception is provided. Parameter: retries The default value is 3 parameter: retry.backoff.ms Indicates the interval between two retries

finally

Above from the producer production message to send this process analysis down, so as to lead to the following a variety of parameters about the whole process Settings, if really can clearly understand these basic knowledge, I believe that you must be helpful. And then we’ll bring in a producer case and a consumer. Interested friends can pay attention to, thank you.