• 👏 Author profile: Hello, everyone, I love to knock code Xiao Huang, the Unicorn enterprise Java development engineer, Java field new star creator.
  • 📝 personal public number: love to knock code of small huang
  • 📕 series of columns: Java design patterns, data structures, and algorithms
  • 📧 If the article knowledge point has the wrong place, please correct! Learn with you and make progress 👀
  • 🔥 if you feel that the blogger’s article is good, please 👍 three even support 👍 a blogger oh
  • 🍂 bloggers are working hard to complete project 2022: dream as a horse, set sail, 2022 Dream chasers

One, the introduction

You may have heard of the High Watermark, but not necessarily the Leader Epoch.

The former is a very important concept in Kafka, while the latter was introduced by the community in version 0.11 to compensate for some of the shortcomings of the high-water mechanism.

Given the importance of high water in Kafka and the popularity of many interviewers, today we’ll focus on high water. Of course, we will also spend some time discussing the Leader Epoch and its role.

The length of this article is quite long and the synchronization between copies is quite detailed, so it is recommended to leave some time for reading

Two, what is the high water level

In everyday life, what do we call water level?

  1. Classic textbook
    • In the momentT, any creation time (Event Time) forT'And,T'<=TAll events have arrived, thenTIt’s defined as the water level
  2. The Streaming System
    • The water level is a monotonically increasing timestamp that represents the earliest unfinished work
  3. As shown in the picture above, labeled asCompletedThe blue area of theHas been completedWhile marked asIn-FlightThe red area of “is unfinished (in progress), and the water mark is the line where the two sides meet.
  4. inkafkaIn, the water level is not a timestamp, but bound to the position information, namely, the ** message displacement (offset) ** to represent the water level
    • Of course, kafka also has Low Watermark, which relates to Kafka delete messages and is beyond the scope of this article

Three, the role of high water level

In Kafka, high water plays two main roles

  • Define message visibility, both to tell our consumers what messages are available for consumption
  • Help kafka complete the synchronization of the replica mechanism

It is possible to have many replicas under Kafka partitions for redundancy to further achieve high availability. There are three types of replicas, depending on the role

  • Leader copy: a copy of the corresponding read and write request on the clients side
  • Follower copy: Passive Remarks The content of the leader copy cannot correspond to read and write requests from clients
  • ISR copy: Contains the Leader copy and all copies of Followerer that are synchronized with the Leader copy

Each Kafka replica object has two important properties: LEO and HW. Note all copies (leader + Follower)

  • LEO: Log end offset of the current log, which records the displacement of the next message in the underlying log (log) of the copy.
  • HW: High Watermark. The VALUE of HW does not exceed LEO for the same replica object.

Let’s assume the following is a high water diagram for a leader copy of a partition:

The messages below the high water mark are submitted messages, and those above the water mark are unsubmitted messages. For the submitted messages, our consumers can consume them, that is, the messages marked 0-7 in the figure. It is important to note that messages with displacement values equal to high water are also uncommitted messages. In other words, high-water messages are not consumed by consumers.

The position at the end of the log, known as LEO, represents the displacement of the copy to write the next message. We can see that the place where the displacement value is 15 is a dummy box, which means that we have only 15 messages in the current copy, the displacement value is 0 to 14, and the displacement of the next new message is 15.

It is observed that our high water level does not exceed its LEO value for the same replica.

High water level update mechanism

From the above, we know that each replica object holds a set of HWS and LeOs.

But in fact, on Broker0, where the Leader Replica is located, there are LEO values for the other Follower replicas, which are also called Remote Replicas.

Kafka replica mechanism in operation:

  • update

    • High water and LEO values for Follower copies on Broker1,

    • High water level and LEO for the Leader copy on Broker0 and LEO for all Follower copies

  • Don’t update

    • All Follower copies of the HW are marked in grey in the figure.

Here you may be wondering, why do we keep Follower copies on Broker0?

  • Help the Leader replica determine its high water level, which is the partition high water level

1. Update the timing

Update the object Update time
LEO for the Leader copy on Borker0 A copy of the LeaderreceiveMessages sent to producers are written to local disk and their LEO values are updated
LEO of the Follower copy on Broker 1 The Follower copy is copied from the Leader copypullMessage, which updates its LEO value when written to the local disk
LEO for remote copy on Broker0 The Follower copy pulls messages from the Leader copy and tells the Leader copyWhich displacement to start pulling fromThe Leader deputy uses this shift to update the LEO of the remote copy
High water level for the Leader copy on Broker0 There are two update times: one is after the Leader copy updates its LEO, and the other is after the remote copy LEO is updated. The specific algorithm: take the minimum value of the Leader copy and all the remote copy LEO synchronized with the Leader
High water level of the Follower copy on Broker 1 After the Follower copy updates the LEO, the Follower copy compares the high water level sent by the LEO copy and the leader copy, and updates its own high water level with fewer values from both copies
  • To keep the Follower copy synchronized with the Leader copy, two conditions must be met
    • The Follower copy is in the ISR
    • The time that the Follower copy LEO value lags behind the Leader copy LEO value does not exceed the parameterreplica.lag.time.max.ms, the default is 10 seconds

These two conditions seem to be one and the same, as the second condition determines whether an instance can enter the ISR.

Sometimes, however, the Follower replica has “caught up” with the Leader but is not in the ISR, such as a replica that has just been restarted. If Kafka only judges the first condition, it is possible that some replicas qualify for “ISR,” but are not yet in the ISR. In this case, the partition high water value may exceed the replica LEO in the ISR, and high water > LEO is not allowed.

2. Leader and Follower copies

A copy of the Leader

  • The logic for dealing with producers is as follows:
    • Writes a message to disk
    • Update LEO value
    • Example Update the high watermark of a partition
      • Get the LEO values of all remote copies saved by the Broker where the leader copy is located, as in:Leo-1, LeO-2, Leo-3.......
      • Get the Leader copy’s high water value:currentHW
      • Updated high water level as:HW = Math.max(currentHW,Math.min(LEO-1,LEO-2,LEO-3....) );
  • The logic for processing the Follower copy pull message is as follows:
    • Read message data from disk (page cache)
    • Update the LEO value of the remote copy with the displacement value in the Follower copy sending message request
    • Update partition high water value (same as above)

A copy of the Follower

  • The processing logic for pulling messages from the Leader is as follows:
    • Write a message to the local disk updating the LEO value
    • Renewed high water level
      • Get the high water value sent by the Leader:currentHW
      • Get the LEO value updated in Step 2:currentLEO
      • Updated high water level as:HW = Math.min(currentHW, currentLEO);

3. Copy synchronization mechanism

Let me give you a practical example of the Kafka replica synchronization process. This example uses a single partitioned theme with two copies.

How is the high water level of the Leader and Follower copies updated when the producer sends a message

First, the initial state, where Remote LEO represents the LEO of the Remote copy from our Broker0 earlier, where our Follower copy is constantly synchronizing data with the Leader copy via FETCH requests

3.1 First Synchronization

When the producer sends a message to our topic partition, the status changes to:

We talked above about the logic of the Leader replica handling producers

  • Write to disk, updateLEO = 1
  • Renewed high water level
    • The current high water level is 0
    • The current LEO of the remote copy is: 0
    • So:HW = math.max (0,0) = 0

The Follow copy attempts to pull messages from the Leader, but this time there are messages to pull, so the state further changes to:

As we explained above, the Leader replica handles the logic of the Follower replica pulling messages

  • Read message data from disk (page cache)
  • Use the Follower copy to send the displacement value in the message requestUpdate Remote Copy LEO value
    • Remote LEO = fetchOffset = 0
  • Update partition high water value ** (unchanged, omitted) **

We talked about the processing logic of the Follower copy pulling messages from the Leader

  • Write a message to local disk and update LEO value to 1
  • Renewed high water level
    • Get the high water value sent by the Leader:currentHW = 0
    • Get the LEO value updated in Step 2:currentLEO = 1
    • Updated high water level as:HW = 0

After this pull, the LEO of our Leader and Follower copies are both 1, and their high water level is still 0, which has not been updated.

3.2 Second Synchronization

They need to be updated in the next pull round, as shown below:

The Leader replica handles the logic of the Follower replica pulling messages

  • Read message data from disk (page cache)
  • Use the Follower copy to send the displacement value in the message requestUpdate Remote Copy LEO value
    • Remote LEO = fetchOffset = 1
  • Example Update the high watermark of a partition
    • LEO:Remote LEO = 1
    • Leader high watermark:currentHW = 0
    • High water level value:HW = math.max (0,1) = 1

The Follower copy pulls the processing logic of the message from the Leader

  • Write message to local disk, update LEO value (no change)
  • Renewed high water level
    • Get the high water value sent by the Leader:currentHW = 1
    • Get the LEO value updated in Step 2:currentLEO = 1
    • Updated high water level as:HW = math.min (1,1) = 1

At this point, a complete message synchronization cycle is over. In fact, Kafka uses this mechanism to synchronize the Leader and Follower replicas.

5. Leader Epoch makes a dazzling debut

Depending on the high water level, we not only define the visibility of the message to the outside world, but also implement the synchronization mechanism of the copy.

As the saying goes: no one is perfect, nothing is perfect

We need to think about, what is the harm of this replica synchronization mechanism?

1. Data loss

  • Blue: Data has been dropped from the disk
  • Yellow: There is no data

When our copy is synchronized for a second time, suppose that our copy B reboots the machine in the processing logic where the Follower copy pulls the message from the Leader.

After copy B is successfully restarted, copy B performs log truncation (truncation based on the high watermark) and adjusts the LEO value to the previous high watermark, that is, 1. The message with displacement value of 1 is deleted from the disk of copy B. At this time, only one message is saved in the underlying disk file of copy B, that is, the message with displacement value of 0.

After truncating logs, copy B pulls messages from copy A for normal message synchronization. When copy A restarts, we make our copy B the Leader.

When copy A is restarted successfully, it automatically checks with the Leader. In this case, when copy A returns, it needs to perform the same log truncation operation, that is, adjust the high watermark to the same value as that of copy B, that is, 1.

After doing this, the message with displacement of 1 is erased from both copies forever, which is the data loss scenario shown in this figure.

2. Inconsistent data

When our copy B wants to synchronize the message from copy A, both copies A and B are restarted

Our copy B starts successfully and is elected Leader. At this time, our producer will send data to copy B, which is 1 in the figure

When copy A starts successfully, it synchronizes with the Leader copy and finds that both LEO and HW of the Leader copy are 1. At this time, copy A does not need to perform any operation

By observing the results, we can see that the data of copy A and copy 2 are inconsistent

3, Leader Epoch

Simply put, the Leader Epoch is a pair of values :(Epoch,offset)

  • Epoch: represents the current leader version number. Starting from 0, when the Leader has changed once, our epoch will be +1
  • Offset: The shift by which the Leader of the version of the epoch wrote the first message

Let’s explain the following example :(0,0) (1,120)

The first Leader version, whose version number is 0, writes 120 messages starting from displacement 0, i.e. [0,119]

The second Leader version number is 1, and the message is written from displacement 120

The Leader Broker stores such a cache, which is periodically written to a checkpoint

When the Leader writes the underlying log, he tries to update the entire cache. If the Leader writes the message for the first time, an entry is added to the cache; otherwise, no updates are made

Each time the replica becomes the Leader again, the cache is queried to obtain the displacement of the Leader version, avoiding data loss and inconsistency

3.1 How can I Avoid Data Loss

When our copy A finishes restarting, it will send A LeaderEpochRequest to Leader copy B to get the latest offset of its leader epoch.

Since followerA and Leader copy B are of the same age (the Leader epoch code is 0), Leader copy B will return its OWN LEO, that is, 2 to Follower copy A

When our copy A receives LEO = 2, its displacement is less than 2 and no operation is required

This is a significant improvement over the high-water mechanism, in that whether a copy performs log truncation is no longer dependent on the high water.

3.2 How to Avoid Data Inconsistency

Initially Copy A is the Leader copy, and when both brokers restart after A crash, brokerB successfully restarts first and Follower copy B becomes the Leader copy

It will start a new era of leaders, Leader1, to receive messages M3

BrokerA then successfully restarts and copy A naturally becomes Follower copy A, which then sends A LeaderEpoch Request to Leader B to determine which Leader era it should be in. Leader B returns the first displacement of the Leader1 era, where the value returned is 1 (the displacement of M3). After receiving this response, follower B truncates the log based on this displacement 1. It knows that m2 should be discarded and synchronously obtains the log from displacement 1.

Six, summarized

Through the mechanism of Leader Epoch, we made our copy’s log truncation not solely dependent on the high water value

Quick summary of key points:

  • There are two main effects of high water

    • Define message visibility

    • Help Kafka complete copy synchronization

  • A message below the high water level of a partition is considered committed, otherwise it is uncommitted. Consumers can only consume submitted messages.

  • End-log displacement (LEO) : Indicates the displacement of the copy to write the next message.

  • Leader Epoch: This is the Leader version. It consists of two parts of data (Epoch, offset)

    • Epoch (monotonic incremented version number) : This version number is incremented whenever the replica leadership changes. A Leader with a smaller version number is considered an expired Leader.
    • Offset: ** The Leader copy ** the displacement of the first message written on the Epoch value.

Before talking about Kafka, the blogger didn’t know where to start. The infrastructure was too conservative, the source code too complex, and finally decided to start with the synchronization of replicas

This concludes today’s kafka copy synchronization, and the next installment will cover the basic architecture of Kafka