Hi, I’m Howie. Nice to meet you again.

As the second part of the series, this article takes you to an in-depth analysis of Kafka request processing process and the implementation details of ultra-high concurrency network architecture design from an evolutionary perspective. Today, we open the third article, we will talk about the Kafka production environment we are concerned about.

So does Kafka lose data at all? If you lose data, how do you solve it?

With this in mind, we will be able to deal with some of the failures in Kafka production and serve the business more consistently.

After reading this article, I’m sure you’ll have a better understanding of how Kafka solves the problem of lost data.

This article has a lot of dry material. I hope you can read it patiently.

01 Overview

More and more Internet companies are using message queues to support their core businesses. Because it is the core business, generally will require the message delivery process to do as much as possible not to lose, if there is data loss in the middle link, it will attract complaints from users, year-end performance will be on the back.

So does Kafka actually lose data? What happens if you lose data? In order to avoid similar situations, in addition to compensation measures, we should fully consider all kinds of abnormal situations in the system during system design, so as to design a stable and reliable message system.

As we all know, Kafka is a simple and distributed architecture consisting of Producer, Broker, and Consumer. We will analyze the lost scenario from these three parts.

02 Semantic analysis of messaging

Before we dive into the message loss scenario, let’s talk about what “messaging semantics” really is.

The semantics of messaging are the guarantees that Kafka provides for messaging between producers and consumers. There are three main types, as shown in the figure below:

1) First, after the Producer sends data to the Broker, it commits. If the commit succeeds, the message will not be lost due to the Replica mechanism. However, after the Producer sends data to the Broker, If communication is interrupted due to network problems, the Producer cannot accurately determine whether the message has been committed or not, which may result in at least once semantics.

2) Prior to Kafka 0.11.0.0, if the Producer did not receive a commit response, it could only re-send the message to ensure that the message had been properly transmitted to the Broker. Since version 0.11.0.0, Producer supports idempotent delivery options that ensure that resending does not cause messages to repeat in the log. To achieve this, the Broker assigns the Producer an ID and de-duplicates the Producer by the sequence number of each message. Transactional semantics are also supported to ensure that messages are sent to multiple Topic partitions and that all messages are either written successfully or fail. This is mainly used in the exactly once semantics between topics.

Idempotence = true. Idempotence = true

Method configuration to enable transaction support: Set the attribute transcational. Id = “specified values”.

3) From the perspective of the Consumer, we know that the Offset is maintained by the Consumer himself. If the Consumer updates the Offset after receiving the message, the Consumer will crash abnormally. If the new Consumer takes over and starts consuming again, the at most once semantics are created (messages are lost, but not repeated).

4) If the Consumer finishes consuming the message, then updates the Offset. If the Consumer crashes, then the new Consumer will take over and pull the message with the Offset again. This results in at least once semantics (the message is not lost, but is repeatedly processed).

Summary: By default Kafka provides “at least once” semantics for messaging, allowing users to provide “at most once” semantics by saving offsets before processing messages. If we could implement consumption idempotent ourselves, ideally the messaging of the system would be strictly “exactly once”, guaranteed not to be lost and to be processed precisely once, but this is difficult to do.

From the overall Kafka architecture diagram we can see that there are three message delivery processes:

1) The Producer sends messages to the Kafka Broker.

2) Kafka Broker synchronizes messages and persists data.

3) The Consumer pulls the message from the Kafka Broker and consumes it.

Data loss can occur in each of these three steps, so how can Kafka ensure that messages are not lost?

From the above three steps, we can see that Kafka only makes “maximum persistence guarantees against loss” for “committed” messages.

What to make of the above sentence?

1) First, there are “committed” messages: When N brokers in Kafka successfully receive a message and write it to a log file, they tell the Producer that the message was successfully committed, and the message becomes “committed” in Kafka.

What do we make of the N brokers here? This depends on the definition of “committed”, where the message can be considered committed as long as one Broker successfully saves the message, or as long as all brokers successfully save the message.

2) The second is “maximum persistence against loss”, which means that Kafka does not guarantee data loss under any circumstances. There is a condition that Kafka does not lose data. If your message is stored on N brokers, then at least one of N brokers is alive to ensure that your message is not lost.

This means that Kafka does not lose data, but only if the messages are “committed” and meet certain conditions.

Knowing the semantics of Kafka messaging and the circumstances under which data loss can be guaranteed, let’s take a closer look at why data loss occurs at each stage and how to minimize data loss.

03 Analysis of Message Loss Scenarios

Analysis of the loss scenario on the Producer end

Before we analyze the data loss on the Producer side, let’s take a look at the process by which the Producer sends messages. For those of you who don’t know Kafka Producer, check out the Kafka Producer story

The message sending process is as follows:

1) The first thing we need to know is that the Producer directly interacts with the Leader Partition of the Broker. Therefore, during the Producer side initialization, the Partitioner needs to obtain the metadata of the Leader Partition corresponding to the related Topic from the Kafka cluster.

2) Send the message directly after obtaining the metadata of the Leader Partition.

3) The Leader Partition corresponding to Kafka Broker will write the message to the Page Cache first and flush the disk periodically for persistence (write the message sequentially to the disk).

4) The Follower Partition pulls the message from the Leader Partition and keeps the same data as the Leader Partition. After the pull is complete, the Follower Partition sends an ACK message to the Leader Partition.

5) After the Kafka Leader synchronizes data with the Follower Partition and receives ACK messages from Replica copies in the ISR, the Leader Partition sends an ACK message to the Producer.

According to the figure above and the message sending process, it can be concluded that: In order to improve the sending efficiency and reduce I/O operations, the Producer side merges multiple requests into one RecordBatch when sending data, encapsulates and converts them into Request requests, and sends data asynchronously (automatic sending at time intervals can also be achieved). So the Producer side loses more messages because the messages are not sent to the Kafka Broker side at all.

There are the following reasons why the message fails to be sent on the Producer:

  • Network: Data is not sent to the Broker at all due to network jitter.
  • Data cause: The Broker rejects a message because the message body is too large for the Broker to handle.

Kafka Producer can also be configured to verify that a message is successfully produced:

The default value of acks on Kafka Producer is 1. The default value is at least once semantics, which does not guarantee exactly once semantics.

Since the Producer has ACK mechanism for sending data, it is possible to lose data.

  • Acks = 0: Indicates that the packets are sent successfully. In this case, if network jitter occurs, the Producer does not verify the ACK and therefore loses the packets and cannot try again.
  • Acks = 1: In this case, as long as the Leader Partition does not Crash, it can ensure that the Leader Partition does not lose data. However, if the Leader Partition crashes unexpectedly and the Follower Partition has not completed data synchronization and has no ACK, data will be lost.
  • Acks = -1 or all: A message can be successfully sent only after the Leader Partition and all Follower partitions in the ISR acknowledge receiving the message. However, data loss is not guaranteed. For example, when only the Leader Partition is left in the ISR, the acks = 1 situation occurs.

Broker-side missing scenario analysis

Let’s take a look at the broker-side persistent storage loss scenario. For those of you who are not familiar with brokers, let’s take a look at Kafka Broker. The data stored procedure is shown below:

The Kafka Broker cluster stores data to disk persistently after receiving it. In order to improve throughput and performance, it adopts an “asynchronous batch flush strategy”, which is to flush data at a certain amount of messages and at certain intervals. The data is first stored in the PageCache. It is up to the operating system to decide when to flush the data from the Cache according to its own policies or to invoke the fsync command to forcibly flush the data. If the Broker crashes and crashes, If a Follower Partition is elected as the new Leader Partition, the lagging message data will be lost.

Since the message store is asynchronously batch flushed, there may be data loss here!!

  • Since Kafka does not provide a “synchronous flush” method, it is possible to lose data from a single Broker.
  • Kafka uses the multi-partition, multi-replica mechanism ** * to ensure data is not lost. If data has been written to the PageCache but has not yet been written to disk, In extreme cases, data can be lost if the Broker suddenly goes down or fails.

Analysis of the Consumer terminal loss scenario

Let’s take a look at the Consumer data loss scenario. For those of you who are not familiar with Consumer, let’s talk about Kafka Consumer. Let’s take a look at the consumption process:

1) Before the Consumer pulls the data, just like the Producer sends the data, it needs to obtain the cluster metadata through the subscription relationship and find the metadata of the Leader Partition corresponding to the related Topic.

2) The Consumer then uses Pull mode to actively Pull messages from the Kafka cluster.

3) In this process, there is a concept of Consumer Group (if you don’t understand it, please refer to the linked article above). Multiple consumers can form a Consumer Group, namely a Consumer Group, and each Consumer Group has a group-ID. Consumers in the same Consumer Group can consume data from different partitions in the same Topic, but no more than one Consumer can consume data from the same partition.

4) Business logic processing will be performed after the message is pulled. ACK confirmation will be performed after the processing is completed, that is, the Offset consumption displacement progress record will be submitted.

5) Offsets are saved to the __consumer_offsets Topic in the Kafka Broker cluster, and each Consumer keeps its own Offset progress.

According to the figure above and the message consumption process, consumption can be divided into two stages:

  • Get metadata and pull data from the Kafka Broker cluster.
  • Processes the message and marks that the message has been consumed, committing the Offset record.

Since the Consumer pulls the message to submit the Offset, there may be lost data!!

  • Possible “automatic submission of Offset”
  • After pulling the message, “submit the Offset first and process the message later”. If the message processing fails unexpectedly at this time, because the Offset has been submitted, after the Consumer restarts, it will start consuming from the position below the previously submitted Offset again, and the previously unprocessed message will not be processed again. For the Consumer, the message is lost.
  • After pulling the message, “first process the message and then submit the Offset”. If an abnormal outage occurs before the submission, and the Offset is not successfully submitted, the message will be pulled again from the last Offset after the next Consumer restarts, without message loss. But there will be repeated consumption, where only the business itself can guarantee idempotency.

04 Message Loss Solution

We have examined the possible data loss scenarios from producers, brokers, and consumers. Here we look at how to minimize data loss.

Producer end solutions

When analyzing the scenario of losing the Producer, we conclude that the Producer is sent in “asynchronous” mode. Therefore, if the Producer is sent in “burn after sending” mode, that is, the call to producer.send (MSG) will return immediately. Since there is no callback, The Broker may not have received the message for network reasons and it may be lost.

Therefore, we can solve the problem of message loss on Producer side from the following aspects:

4.1.1 Changing the Invocation Mode:

Instead of using the method of sending the call immediately after it is sent, we use the method of sending the message back to the notification function, that is, producer.send (MSG, callback), so that once it is found that the sending fails, we can do specific processing.

Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback); public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record); return doSend(interceptedRecord, callback); }Copy the code

(1) If the message is lost due to network jitter, the Producer can retry.

(2) If the message size does not meet the requirements of the Broker, it can be adjusted and sent again. In this way, the maximum number of messages can be sent successfully.

4.1.2 ACK Confirmation Mechanism:

This parameter represents the definition of a “committed” message.

Acks needs to be set to -1/all. -1/all indicates how many replica brokers received the message before it is considered a successful message submission.

There are two very typical cases for acks = -1/ all:

(1) Data is sent to the Leader Partition and all ISR members have completed data synchronization. If the Leader Partition crashes abnormally, a new Leader Partition will be elected and data will not be lost, as shown in the following figure:

(2) Data is sent to the Leader Partition and some ISR members are synchronized. At this time, the Leader Partition crashes abnormally and the remaining Follower partitions may be elected as the new Leader Partition. The failure flag will be sent to the Producer end, and the data will be re-sent later. The data may be repeated, as shown in the following figure:

Therefore, through the above analysis, we also need to configure other parameters to ensure:

replication.factor >= 2

min.insync.replicas > 1
Copy the code

This is the configuration of the Broker side, described in more detail below.

4.1.3 Retries:

This parameter indicates the number of retries that the Producer sends messages.

You need to set retries to a number greater than 0, which in Kafka 2.4 defaults to integer.max_value. In addition, to ensure the sequence of sending messages, configure the following parameters:

retries = Integer.MAX_VALUE

max.in.flight.requests.per.connection = 1

The Producer then retries until the Broker returns an ACK flag, and only one connection sends data to the Broker to ensure that the messages are sequential.

4.1.4 Retry Time Retry.backoff. ms:

This parameter indicates the interval between two retries after a message is sent timeout to avoid frequent invalid retries. The default value is 100ms, and 300ms is recommended.

Broker-side solutions

When analyzing the lost scenario of the Broker, we find that it uses the “asynchronous batch flush” strategy to store data to “PageCache” and then asynchronously flush disk. Because the “synchronous flush” strategy is not provided, So Kafka uses “multiple partitions, multiple copies” to maximize data loss.

We can ensure this through the following parameters:

2 unclean. Leader. Election. Enable:

This parameter indicates which followers are eligible to be elected as the Leader. If a Follower’s data is too far behind the Leader’s, the data will be lost once it is elected as the new Leader. Therefore, we need to set it to false to prevent this from happening.

4.2.2 the replication factor:

This parameter indicates the number of partition copies. You are advised to set replication.factor >=3 so that if the Leader copy crashes unexpectedly, the Follower copy will be elected as the new Leader copy to continue providing services.

Holdings min. Insync. Replicas:

This parameter indicates the minimum number of replicas that a message must be successfully written to the ISR to be considered “committed”. It is recommended that you set min.insync.replicas > 1 to improve message persistence and prevent data loss.

Replicas = replicas = replicas = replicas = replicas = replicas = replicas = replicas = replicas = replicas = replicas = replicas = replicas = replicas = replicas = replicas Replication. factor = min.insync.replicas +1 to ensure maximum system availability.

Consumer side solution

When analyzing the scenario of Consumer end loss, we conclude that it needs to submit Offset displacement information after pulling the message. Therefore, in order not to lose data, the correct approach is to pull data, process business logic, and submit consuming Offset displacement information.

We also need to set the parameter enable.auto.mit = false to manually commit the shift.

In addition, in the case of repeated consumption messages, the business itself guarantees idempotency, ensuring that only one successful consumption is required.

05 summary

Here, let’s summarize the main points of this article.

1. Overview of possible data loss from the overall Kafka architecture.

Taking you through the concept of “messaging semantics,” Kafka only does “maximum persistence against loss” for “committed” messages.

3. Analyze the scenarios that may lead to data loss at the Producer, Broker and Consumer end and specific highly reliable solutions.

If my article is helpful to you, please also help to like, view, forward, thank you very much!

Adhere to the summary, continue to output high-quality articles to follow my wechat public number: Hua Zai chat technology