🍂 bloggers are working on the 2022 project:Take the dream as the horse, set sail, 2022 dream chasers

Kafka has no message loss configuration

At what point can Kafka be guaranteed not to lose messages?

Kafka only makes limited persistence guarantees for committed messages.

  • Committed message: When Kafka’s Broker receives a message and writes it to a log file to inform the producer that the message was successfully committed, the message is called committed
  • Limited persistence guarantee: There is no guarantee that messages will not be lost if all brokers in your Kafka cluster fail.

Generally speaking, there are three main aspects of message loss: the producer side, the consumer side, and the Broker side

1. The producer end

Kafka Producer asynchronously sends messages. If we call the producer.send(MSG) API, it returns immediately, but we cannot assume that the message has been sent successfully.

This method of sending has a funny name: “Fire and Forget,” for short. So if our message fails to deliver, we don’t know.

The network jitter, the Broker did not receive the message, or the message was rejected by the Broker

General solution: Producer should always use an API with callback notifications. Instead of using producer.send(MSG), use producer.send(MSG, callback). The callback function tells us if the message was actually sent successfully and returns the reason for the failure.

2. The consumer side

Comsumer lost data mainly reflected in the absence of messages to be consumed on the Consumer side. The Consumer program has a “shift” that represents the current position of the Consumer Consumer partition.

For example, our displacement of Consumer A is 9, and our displacement of Consumer B is 11.

The “displacement” here is like a bookmark in a book, marking how many pages we have read so far and starting from that position the next time we turn the page.

If we update the location of our bookmarks before we start reading, we may miss some of the books in case something happens to us. You lose the message.

The same is true for our Kafka, where we maintain the order of consuming messages first (reading) and then updating and moving (bookmarks).

Of course, the biggest disadvantage of this is that we may repeatedly read a page, which leads to the problem of repeated data consumption, which we can discuss later. Here I first propose a method: to ensure its idempotency

When we consume messages, we might adopt the following consumption structure for speed:

After our Consumer pulls the message, asynchronously opens multiple threads to execute our business logic, and the Consumer automatically submits the shift to our Broker. But if our thread 1 fails, thread 1’s data will be lost.

Solution: ** When we execute our consumption messages with multiple threads, the Comsuner program does not turn on the auto-commit shift, but instead requires the application to manually commit the shift. ** One caveat here: multithreaded consumption and its tendency to consume messages more than once.

3. The Broker

The absence of the Broker side was previously written: Kafka copy synchronization

  • Blue: Data has been dropped from the disk
  • Yellow: There is no data

When our copy does a second synchronization, our copy B reboots the machine.

After copy B is successfully restarted, copy B performs log truncation (truncation based on the high watermark) and adjusts the LEO value to the previous high watermark, that is, 1. The message with displacement value of 1 is deleted from the disk of copy B. At this time, only one message is saved in the underlying disk file of copy B, that is, the message with displacement value of 0.

After truncating logs, copy B pulls messages from copy A for normal message synchronization. When copy A restarts, we make our copy B the Leader.

When copy A is restarted successfully, it automatically checks with the Leader. In this case, when copy A returns, it needs to perform the same log truncation operation, that is, adjust the high watermark to the same value as that of copy B, that is, 1.

After doing this, the message with displacement of 1 is erased from both copies forever, which is the data loss scenario shown in this figure.

4. Solutions

  • Don’t useproducer.send(msg), but to useproducer.send(mag, callback). Be sure to use one with callback notificationsendMethods.
  • Set up theacks = -1. Acks is a parameter of Producer that represents the definition of “committed” messages. There are three forms of acks, which are:1, 0, minus 1, respectively represent:You need to confirm that the Leader has successfully received the data, not wait for the Broker to synchronize, and need to get confirmation from all followers.
  • Set up theretriesIs a larger value. When network jitter occurs, messages can be sent again automatically
  • Set up theunclean.leader.election.enable = falseDo not allow followers who are too far behind the Leader to participate in the election.
  • Set up thereplication.factor >= 3. Save multiple copies of the message.
  • Set up themin.insync.replicas > 1. It controls at least how many copies a message must be written to before it is considered “committed.”
  • Make sure thatreplication.factor = min.insync.replicas + 1
  • The Consumer side has a parameter to ensure that the message is consumed before submittingenable.auto.commit. It’s better to set it tofalseAnd take the way of manually submitting the displacement.

TCP management of producers

1. Why use TCP

From a community perspective, people can take advantage of some of the advanced features offered by TCP itself, such as multiplexing and the ability to poll multiple connections simultaneously.

So-called multiplexing, or multiplexing Request, is the process of merging two or more data streams into a single underlying physical connection.

In addition, the community has found that the HTTP libraries currently known are somewhat rudimentary in many programming languages.

2. Kafka producer program

The main object of Kafka’s Java Producer API is KafkaProducer. The steps we take to create a producer are as follows:

  • Build the parameter objects required by the producer adversary
  • Using parameter objects, create an instance of the KafkaProducer object
  • Send messages using KafkaProducer’s Send method
  • Call KafkaProducer’s Close method to close releasing resources
Properties props = newProperties (); Props. The put (" parameter1"And" parameters1The value of "); Props. The put (" parameter2"And" parameters2The value of "); ...try (Producer<String, String> producer = new KafkaProducer<>(props)) {
            producer.send(newProducerRecord < String, the String > (...) , callback); ... }Copy the code

3. When to create a TCP connection

There are two possible ways to create a TCP connection: Producer Producer = new KafkaProducer(props) and producer.send(MSG, callback).

In fact, when our Kafka instantiation is created, the producer application creates and starts a thread in the background called Sender, which starts running by creating a connection to the Broker.

If we create a TCP connection during instantiation, how will our producer know which Broker this TCP connection corresponds to when sending a message?

We know that when we create the instance, we need to specify the bootstrap-Servers, that is, the server address of the target cluster. When our Producer starts, it initiates connections to those brokers. If you specify 1000 brokers, TCP connections to those 1000 brokers will be created first.

In practice, of course, there is no need to include all clusters. When our Producer is connected to any Broker, it can read its metadata and obtain information about all brokers.

So are we creating KafkaProducer a thread-safe action?

The community has documented that KafkaProducer is thread-safe. RecordAccumulator {KafkaProducer}} RecordAccumulator {KafkaProducer}}} This implements thread safety for KafkaProducer.

RecordAccumulator is a cache of messages waiting to be sent. KafkaProducer puts messages in. When the message is full, KafkaProducer notifies the sender to send the message and free up space. RecordAccumulator is equivalent to the storage of the freight station. The goods are continuously put into the accumulator. Every full box will notify the consignor to pick up the goods.

In RecordAccumulator, the main data structures are Private Final ConcurrentMap

> Batches. TopicPartition is the Java object Kafka uses to represent a TopicPartition, which is itself immutable. The Deque determines whether KafkaProducer is thread-safe. RecordAccumulator is an accumulator class that uses synchronized locks to ensure thread safety.

for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
    TopicPartition part = entry.getKey();
    Deque<ProducerBatch> deque = entry.getValue();
    Node leader = cluster.leaderFor(part);
    synchronized (deque) {
        / / execution}}Copy the code

RecordAccumulator is an Accumulator. For example, if you are interested in RecordAccumulator, please read RecordAccumulator

At this point, it’s safe to say that KafkaProducer is thread-safe. But Brian Goetz of The Java Concurrent Programming Practice says that starting a thread in an object constructor causes the this pointer to escape.

Of course, our TCP connection may also be created in two other places: one after the metadata is updated and one when the message is sent.

  • When the Producer updates the cluster’s metadata information, it creates a TCP connection if it finds that there is no current connection to some brokers
  • When sending a message, the Producer finds that there is no connection to the target Broker and creates one

4. When to close the TCP connection

There are two shutdown methods: one is user active shutdown, and the other is Kafka automatic shutdown.

  • The first: active closure in a broad sense. Even callkill -9Actively killing the Producer app. recommendedproducer.close()
  • Second: Kafka shuts it down for you. Producer side parametersconnections.max.idle.msKafka closes a TCP connection if no requests flow through it during that time.

Previous recommendations:

The interviewer asked me to talk about kafka’s replica synchronization mechanism, and I started crying

Random sampling is dead. Reservoir sampling is king

Take dream as the horse, set sail, double inhuman 2021, swastika dream journey