Kafka suffers from message loss. Message loss occurs between brokers, producers, and consumers.

Broker

Broker loss is due to Kafka itself, which stores data asynchronously and in bulk on disk to achieve higher performance and throughput. To improve performance and reduce the number of flush times, Kafka uses batch flush. That is, according to a certain amount of messages, and time interval for disk brushing. This mechanism is also due to the Linux operating system. When data is stored in the Linux operating system, it is first stored in the Page cache and flushed (from Page cache to file) based on time or other conditions, or forcibly flushed by using the fsync command. When data is in the Page cache, it is lost if the system fails.

High-speed read/write on the server and synchronization to Replica

The figure above illustrates a process by which the broker writes data and synchronizes it. The broker writes data only to the PageCache, which is in memory. This part of the data will be lost after the power failure. The pageCache data is flushed from the flusher program in Linux. There are three triggering conditions for brushing disks:

  • Actively call sync or fsync
  • The available memory is below the threshold
  • Dirty data Time reaches the threshold. Procedure Dirty is an identifier of Pagecache. When data is written to pagecache, it is marked as dirty. After data is flushed, the dirty flag is cleared.

The Broker configures the flush mechanism by calling the fsync function to take over the flush action. From a single Broker, the pageCache data is lost.

Kafka does not provide a way to flush simultaneously. Synchronous flush is implemented in RocketMQ by blocking the asynchronous flush process and waiting for a response, an Ajax-like callback or a Java Future. Here is a piece of rocketMQ source code.

GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); / / brush plateCopy the code

That said, it is theoretically impossible to completely guarantee kafka against loss of messages to a single broker, and this can only be mitigated by adjusting the parameters of the flush mechanism. For example, reduce the disk brushing interval, reduce the disk brushing data volume size. The shorter the time, the worse the performance, the better the reliability (as reliable as possible). This is a multiple choice question.

To solve this problem, Kafka collaborates with producer and broker to handle the loss of a single broker’s parameters. Once the producer discovers that the broker message is missing, it automatically retry it. Messages will not be lost until the number of retry times exceeds the (configurable) threshold. The producer client needs to handle the situation manually. So how did Producer detect the data loss? The ACK mechanism is similar to the HTTP three-way handshake.

The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed: acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, And the retries configuration will not take effect (as the client won’t generally know of any failures). The offset given back for each record will always be set to -1. acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost. acks=allThis means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.

kafka.apache.org/20/do…

The above reference is kafka’s official interpretation of the parameter acks (in older versions, this parameter was request.required. Acks).

  • Acks =0, producer is most efficient without waiting for the broker to respond, but messages are likely to be lost.
  • Acks =1. If the leader broker receives the message, it returns an ACK without waiting for a response from other followers. It can also be interpreted as an ACK number of 1. At this point, if the follower hangs up before receiving the synchronized message from the leader, the message will be lost. According to the example in the figure above, if the leader receives a message and successfully writes the PageCache, the producer returns an ACK, and the message is successfully sent. But at this point, according to the figure above, the data has not been synchronized to followers. If the leader loses power at this point, the data is lost.
  • Acks =-1. After receiving the message, the leader broker suspends and waits for the results from all the followers in the ISR list to return an ACK. -1 is equivalent to all. In this configuration, the PAGecache does not return an ACK until the leader writes data to the pagecache, and all isRs return “success” before the ACK is triggered. If the power fails, the producer knows that the message was not successfully sent and will resend. If the follower successfully returns an ACK after receiving the data and the leader powers off, the data will be stored in the original follower. After a new election, the new leader holds this data. Two steps are required to synchronize data from the leader to followers:
  1. Data is flushed from pageCache to disk. Only data in the disk can be synchronized to the replica.
  2. The data is synchronized to the replica and the replica successfully writes the data to the PageCache. After the producer receives an ACK, the data will remain on the leader’s disk at least even if all the machines fail.

As mentioned in the third point above, the followers of the ISR list need to be matched with another parameter to better ensure the effectiveness of ACK. The ISR is a “reliable follower list” maintained by the Broker, the In-sync Replica list. The Broker’s configuration contains one parameter: min.insync.replicas. This parameter indicates the minimum number of copies in an ISR. If this value is not set, the follower list in the ISR may be empty. In this case, acks=1.

As shown in the picture above:

  • Acks =0, total time f(t) = f(1).
  • Acks =1, total time f(t) = F (1) + f(2).
  • Acks = 1, total time f (t) = f (1) + Max (f (A), f (B)) + f (2).

Performance decreases and reliability increases.

Producer

The Producer loses messages on the Producer client.

In order to improve efficiency and reduce I/OS, producer can combine multiple requests before sending data. The first line cache is stored in the local buffer. The caching method is similar to the previous flushing method. Producer can package requests into “chunks” or send data from the buffer at intervals. With buffer we can transform the producer into an asynchronous mode, which can improve our sending efficiency.

However, the data in buffer is dangerous. In normal cases, an asynchronous call from the client can handle a message failure or timeout via callback. However, if the producer is stopped illegally, the buffer data is lost and the broker cannot receive the data. Alternatively, when the Producer client runs out of memory, messages can also be lost if the Producer client uses a strategy of discarding messages (another strategy is block blocking). Or, messages can be generated too quickly (asynchronously), leading to too many pending threads and insufficient memory, causing the program to crash and messages to be lost.

Producer adopts the schematic diagram that is sent in batches

A schematic diagram of asynchronous message production speed

According to the figure above, several solutions can be thought of:

  • Asynchronously sending messages changed to synchronous sending. Or when a service generates a message, it uses a pool of blocked threads with an upper limit on the number of threads. The whole idea is to control the rate at which messages are generated.
  • Expand the capacity configuration of the Buffer. This approach can alleviate the situation, but not eliminate it.
  • Instead of sending messages directly to buffer (memory), the service writes the message to a local disk (database or file), and another (or a small number of) production threads send the message. This creates an additional buffer layer between buffer and service, with more space.

Consumer

The Consumer message has the following steps:

  1. Receives the message
  2. Process the message
  3. Feedback “processed” (commited)

There are two main types of Consumer consumption:

  • Automatically submit offset, Automatic offset research (Case)
  • Manually submit offset, Manual offset Control

The mechanism for Consumer auto-commit is to commit the messages received at certain intervals. The commit process and the consuming process are asynchronous. That is, there may have been an unsuccessful consuming process (such as an exception thrown) and the COMMIT message has already been committed. At this point the message is lost.

Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); // Props. Put (" enable.auto.mit ", "true"); Put (" auto.mit.interval.ms ", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); While (true) {// after 1000ms, the message status is changed to committed ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) insertIntoDB(record); // It may take more than 1000ms to put the message into the database}Copy the code

The above example is an example of automatic submission. If at this point, insertIntoDB(Record) occurs an exception, the message will be lost. Here is an example of manual submission:

Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); Put (" enable.auto.mit ", "false"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); final int minBatchSize = 200; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); While (true) {auto commit after poll is called ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { insertIntoDb(buffer); Consumer.mitsync (); // Commit only after all messages are consumed. buffer.clear(); }}Copy the code

By changing the commit type to manual, messages are guaranteed to be consumed “at least once.” However, there may be repeated consumption, which is beyond the scope of this article.

The above two examples use the High level API of Consumer directly, and the client is transparent to controls such as offset. You can also use the Low level API to manually control the offset, which can also ensure that messages are not lost, but it is more complicated.

try { while(running) { ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(record.offset() + ": " + record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); Consumer.mitsync (collections.singletonMap (partition, new OffsetAndMetadata(lastOffset + 1))); } } } finally { consumer.close(); }Copy the code