1. Multi-copy data synchronization policy

To improve message reliability, Kafka has N copies of each Topic partition. Among the N replicas, one replica is the Leader and the others are followers. Follower replicas are not serviced in Kafka and all requests must be handled by the leader replicas.

The Follower copy’s only job is to constantly pull messages from the Leader copy and write them to its own commit log. If the synchronization continues slower than the message writing speed of the Leader copy, after the replica.lag.time.max.ms time, the Follower copy will be considered out of sync with the Leader copy and therefore cannot be added to the ISR. At this point Kafka automatically shrinks the ISR collection, kicking the copy out of the ISR.

Both the Leader and Follower maintain their own HW. When a new message is written, the Consumer cannot be consumed immediately and has to wait for the Followers in the ISR to complete the replication from the Leader. Consumer can only consume data before HW.

An AR list is maintained in Kafka. AR is divided into ISR and OSR, and is a copy of all partitions.

Data in the LEADER can only be committed and accessed by consumers if all replicas in the ISR synchronize with the leader. It does not affect data submission whether the replica in the OSR synchronizes the leader’s data. Followers in the OSR do their best to synchronize the leader’s data.

At the beginning, all copies are in the ISR. In the process of kafka, if the synchronization speed of a copy is slower than the threshold specified by replica.lag.time.max.ms, it is kicked out of the ISR and stored in the OSR. If the synchronization speed recovers later, it can be returned to the ISR.

Leo-logendoffset: specifies the latest data offset of the partition.

Hw-highwatermark: Data is considered to have been committed only after data is synchronized to all replicas in the ISR. Data before HW is updated to this location can be accessed by consumers. Data that has not been synchronized cannot be accessed by consumers.

After the leader breaks down, only a new leader can be selected from the ISR list. No matter which copy in the ISR is selected as the new leader, the data before THE HW is known, which can ensure that consumers can continue to see the previously submitted data after the leader is switched.

2. Data consistency

For “Consumer can only consume data before HW”, the reason for this is that messages that have not been replicated by enough replicas are considered “unsafe” and are likely to be lost if the Leader crashes and another replica becomes the new Leader.

If we allow consumers to read these messages, we might break consistency. Imagine that a consumer reads and processes Message4 from the current Leader (replica 0). At this time, the Leader hangs up and elects replica 1 as the new Leader. At this time, another consumer reads the message from the new Leader and finds that the message does not exist. This leads to data inconsistency problems.

Of course, the introduction of the High Water Mark mechanism causes message replication between brokers to be slow for some reason, and the time it takes for messages to reach consumers to be longer (since we wait for the message to complete replication first). The delay time can be set by the replica.lag.time.max.ms parameter, which specifies the maximum delay that a replica can allow when copying messages.

3. Troubleshoot copy faults

3.1 followers fault

If a Follower fails, the Follower is temporarily kicked out of the ISR. After the Follower recovers, the Follower reads the last HW recorded on the local disk, intercepts the log file that is higher than the HW, and synchronizes the log file from the HW to the Leader. When the Follower’s LEO is greater than or equal to the PARTITION’s HW, that is, after the Follower catches up with the Leader, the Follower is readded to the ISR.

3.2 Leader failure

If the Leader fails, a new Leader is elected from the ISR. The remaining followers intercept the log files whose values are higher than their own HW and then synchronize data from the new Leader.

4. The Leader election

Kafka dynamically maintains an ISR for each partition in Zookeeper. When the Leader fails, a Follower from the ISR is selected as the primary. If by chance all the followers in the ISR fail, there are two options:

  • Wait for any Follower in the ISR to recover and select the Follower as the Leader.
  • Select the first recovered Follower as the Leader. This Follower may not be in the ISR.

Kafka copy mechanism Kafka copy mechanism

5. Kafka is using Producer’s ACK message confirmation mechanism

  • Acks = 0: means that if the producer can send the message over the network, the message is considered to have been successfully written to Kafka.

    Errors can still occur in this case, such as sending objects that can’t be serialized or a network card that fails, but if the partition is offline or the entire cluster is unavailable for a long time, no errors will be received.

    Running in acks=0 mode is very fast (which is why so many benchmarks are based on this mode), and you can get amazing throughput and bandwidth utilization, but you will definitely lose some messages if you choose this mode.

  • Acks = 1: means that the Leader will return an acknowledgement or error response if it receives a message and writes it to the partition.

    However, it is still possible to lose data in this mode, for example if a message has been successfully written to the Leader but the Leader crashes before the message is copied to the follower copy. But the sending client thinks the message was successfully written, but the consumer can’t see the missing message, so the system is still consistent. But from a producer’s point of view, it’s missing a message.

    In this mode, in the event of normal Leader election, producers will be received in the election of a LeaderNotAvailableException recoverable abnormalities, if producers can properly deal with the error, it will retry sending sad, messages will eventually arrived safely new Leader there.

    When an unrecoverable exception is encountered, it is thrown. In this case, the exception can be captured and logged to the database or cache for separate processing.

  • Acks = all / -1: means that the Leader waits for all synchronized replicas to receive the message before returning an acknowledgement or error response.

    If combined with the min.insync.replicas parameter, it determines how many replicas are in the ISR to synchronize the message before returning an acknowledgement, the producer will retry until the message is successfully committed. However, this is also the slowest because the producer needs to wait for all replicas to receive the current message before continuing to send other messages.

In addition, a Producer can also send messages in synchronous (producer.type=sync) or asynchronous (producer.type=async) mode. If it is set to asynchronous, the performance of sending messages is greatly improved, but the risk of data loss is increased. To ensure message reliability, producer.type must be set to sync.

6. Kafka server data loss solution

A common scenario is that a Kafka broker goes down and the partition leader is reelected. If some data from other followers is not synchronized at this time, the leader dies. If a follower is elected as the leader, some data will be lost.

Therefore, it is generally required to set at least as follows:

  • Topic level: Replication. Factor > 1 requires that each partition must have at least 2 copies.

  • Broker level: Set the min.insync.replicas to > 1. This requires the leader to be aware that at least one follower is still in contact with him and that there is only one follower left after the leader has died. Close the Leader election of incomplete, namely unclean. Leader. Election. Enable = false.

  • Producer level: set acks=all: Each piece of data must be written to all replicas before it is considered to be written successfully. Set retries=MAX: This is a request that if a write fails, it tries again indefinitely. Set the occurrence mode to synchronous producer.type=sync.

For services that require high consistency, at least on the Kafka Broker side, data will not be lost when the leader broker fails.

7. Producer retry parameters

There are two types of errors that producers need to handle: those that producers can handle automatically, and those that developers need to handle manually. Error response codes can be divided into two types:

  • Retried error: Can be resolved after retry. If the broker returnsLEADER_NOT_AVAILABLEIf not, the producer can try to resend the message. Perhaps a new leader has been elected and the message has been sent successfully.
  • Non-retried error: Cannot be resolved by retry. If the broker returnsINVALID_CONFIGError. Configuration options cannot be changed even by retries, so such retries are meaningless.

In general, if the goal is not to lose any messages, it is best to allow the producer to keep retrying when it encounters a retried error. Because problems such as head election or network connectivity can be resolved in seconds, if producers keep retrying, developers don’t have to deal with them extra.

However, if the retry is to send a message that has been written but returns an ACK failure, there is some risk. For example, if a producer does not receive an acknowledgement from the broker because of a network problem, but the message has actually been written successfully, the producer will assume that there is a temporary network failure and retry sending the message (because it does not know that the message has been written).

In this case, the broker receives two identical messages. At this point we need to do the proper processing to ensure that each message is saved at least once. In practice, many applications can do message idempotent: add unique identifiers to messages, and consumers detect duplicate messages and process them when they read them. That is, even if duplicate messages occur, there is no negative impact on the correctness of the processing results.

Idempotent solution for producers to send repeated messages

Kafka version 0.11.0.0 introduces the Idempotent Producer mechanism, in which the same message may be sent multiple times by the producer, but is written only once at the broker side, each message is de-coded, and has little impact on Kafka overhead.

Idempotence =true requires ACK =all and REtries > 1. In this way, idempotent producer can only ensure that there are no duplicate messages on a single partition.

Idempotence principle:

Each producer has a producer ID, which is used by the server to record the status of each producer. Each message of each producer will carry an increasing sequence, and the server will record the current maximum sequence corresponding to each producer. ProducerId + sequence.

If the sequence of the new message is not greater than the current maximum sequence, the message is rejected. If the maximum sequence of the message is updated at the same time, the resent message is rejected by the server to avoid message duplication.

In the multi-partition case, we need to ensure atomicity of writes to multiple partitions, that is, messages written to multiple partitions are either all successful or all rolled back. In this case, you need to use transactions to set transcational. Id to a specified string on the producer side to ensure atomically writing to multiple partitions.

conclusion

  • Idempotence: Ensures that messages sent to a single partition are sent only once and no duplicate messages occur.
  • Transaction: Guaranteed atomically written to multiple partitions, i.e., messages written to multiple partitions either all succeed or all roll back flow processing EOS. Stream processing is essentially a read-process-write pipeline, and EOS guarantees atomicity of operations throughout the process. (Note that this only applies to Kafka Streams.)

9. Solutions for consumer data loss and repeated consumption

The only way a consumer can lose data is if you consume the message, then the consumer automatically submits the offset, making Kafka think you’ve consumed the message, but you’re just about to process the message, and before you can, you hang up, and the message is lost.

So as long as you turn off the automatic submission of offset and manually submit the offset after processing, the data will not be lost. However, there may still be repeated consumption at this time. For example, you have just finished processing and have not submitted the offset, but you die. At this time, you will definitely repeat consumption.

The usual solution is to make the downstream idempotent. Create a storage message consumption record table, query the record table by the primary key ID, determine whether the message status has been consumed. If not, the message is processed, and the status of the message record is updated to consumed.