The term

Leader Epoch: A monotonically increasing 32-bit number representing each Leader replica era, stored in each message.

Leader Epoch Start Offset: The displacement of the first message for each Leader replica era.

Leader Epoch Sequence File: A Sequence File in which the Leader Epoch Start Offset changes are recorded for each Leader Epoch copy.

The Leader Epoch Request: the follower copy obtains the displacement of the first message of the Leader copy through this Request. If no sequence file exists in the partition where the Leader copy resides, the Log End Offset of the Leader copy is obtained.

motivation

In the original copy synchronization (replication) mechanism, logs were truncated using HW high water levels, which in some cases resulted in data loss and data inconsistencies.

Scenario 1: Message loss

The replication protocol in Kafka has two phases. In the first phase, the follower copy synchronizes data from the leader copy, which fetches the m2 message. Stage 2: In the next round of RPC calls, the follower will confirm that it has received the m2 message. Assuming that all the other follower replicas have also confirmed that they have successfully received the message, the Leader replica will update its high-water HW. This high water level is then sent back to the follower replica in the request response when the follower synchronizes data from the Leader replica again. As you can see, the leader copy controls the progress of the high-water HW and is passed back to the follower copy in subsequent RPC calls.

The replication protocol also includes a phase in which the follower copy logs according to the high watermark HW recorded by the follower copy during initialization to ensure data synchronization and then synchronizes the Leader copy. A bug occurs in this step: During synchronization, messages may be lost due to log truncation if the follower becomes the leader copy.

Let’s look at an example:

Assume that we have two brokers, Broker A and Broker B. The current copy of Broker B is the Leader copy. Follower copy A has received message M2 from Leader B, but has not confirmed to Leader B that M2 has been committed (phase 2 has not occurred yet. The phase 2 follower copy will confirm the submission of M2 and update its own high water HW). At this point, follower copy A restarts. During initialization, follower copy A performs the log phase based on the high watermark recorded by follower copy A to ensure data synchronization, and then synchronizes Leader copy B to obtain data. Follower COPY A is elected to become leader copy A, and the M2 message is permanently lost.

The root cause of this message loss problem is that the follower copy takes an extra round of RPC to update its own high water value. During this two-phase interval, followers lose data when performing log truncation if the leader copy is changed. Consider several simple solutions to solve this problem. One is that the leader copy waits for the follower copy to finish updating its high-water HW and then updates the high-water HW managed by the leader. But this adds an extra round of RPC calls; One is to not perform the logging phase operation until the data is synchronized from the Leader replica, which should work, but it presents other problems. Take a look at the following example

Scenario 2: Message confusion on different replicas

Suppose there are still the two copies above, both copies are down due to a power outage. Unfortunately, logs on different machines can be disjointed and inconsistent, and in the worst case, copy synchronization can get stuck.

One potential problem is that Kafka is flushed asynchronously, which means that after each crash, there are any number of messages on different partitioned copies. After the machine recovers, any replica can become the leader replica. If the machine on which the leader replica resides stores the least number of messages, some data will be lost.

When follower copy B synchronizes data M2 from leader copy A, both brokers go down, the machine on which follower copy B is located successfully restarts first and follower copy B becomes leader copy A, receiving new messages m3 and updating its high water level. Later broker A also restarts and its copy becomes Follower copy A. Follower copy A initializes to truncate the log according to the Leader’s high water level. Since the high water level of the two copies is the same, follower copy A does not need to truncate. As a result, the messages on the two replicas were disordered and inconsistent.

The solution

We solve these two problems by introducing the concept of Leader Epoch to assign an identifier to each Leader replica era, which is then added to each message by the Leader. Each replica retains a [LeaderEpoch => StartOffset] vector that identifies changes in the message during the leadership era. When the follower copy needs to truncate the log, this vector replaces the high water level as the reference data for truncation. The follower copy will obtain an appropriate Leader Epoch from the set of all leader Epoch vectors in the leader copy to truncate the data that does not exist in the Leader copy. The Leader replica can effectively tell followers which offset the replica needs to truncate to based on the Leader Epoch.

We can do this through the Leader Epoch scheme

Solve the problem in Scenario 1:

In scenario 1, after follower copy A restarts, it sends A LeaderEpochRequest to leader copy B to obtain the latest offset of its leader epoch. Since followerA and Leader copy B are of the same age (the Leader epoch code is 0), Leader copy B will return its OWN LEO, that is, 2 to follower copy A. Note that, unlike the high watermark, the follower copy has an offset value of 0, the follower copy does not truncate any messages and M2 is retained without loss. If followerA is selected as Leader, all committed logs will be retained and log loss will be resolved.

Solve the problem in Scenario 2:

Copy A starts as Leader copy A, and when both brokers restart after A crash, brokerB successfully restarts first and Follower copy B becomes Leader Copy B. It will start a new era of leaders, LE1, to receive messages M3. BrokerA then successfully restarts, at which point Replica A naturally becomes Follower Replica A, which then sends leader B A LeaderEpoch Request to determine which leader era it should be in, and leader B returns the first shift of the LE1 era, The value returned here is 1 (the displacement of M3). After receiving this response, follower B truncates the log based on this displacement 1. It knows that m2 should be discarded and synchronously obtains the log from displacement 1.

Refer to the link

Kip-101 – Change the replication protocol to truncate using the Leader Epoch instead of the High Watermark