In Kafka, ISR mechanism is used to ensure message reliability. How does Kafka ensure message reliability and consistency

What is ISR in Kafka?

The ZK stores the AR Replicas (Assigned Replicas) list, which contains all the Replicas of the partition. AR = ISR+OSR

  • ISR (in Sync Replica) : If any member of the ISR is alive, only the member of the ISR can become the leader. The internal copies are the copies that must be synchronized each time the information is submitted (when acks = all). When the leader fails, A follower is elected to serve as the leader in the ISR collection. If the copy in the ISR is considered broken, the follower is kicked out of the ISR and re-enters the ISR when it catches up with the message data of the leader.
  • Out Sync Replica (OSR) : It is not necessary to confirm the saved replica only after synchronization. Whether the replica in the OSR synchronizes the leader’s data does not affect data submission. Followers in the OSR try their best to synchronize the leader’s data.
How does Kafka control how many replicas it needs to synchronize before it can return confirmation-to-producer messages that are available?
  • When writing to KakFA, the producer can choose whether to wait for message confirmation at 0 (writing to the Leader only),1 (synchronizing only one replica), or -1 (all replicas) (the replicas here refer to replicas in the ISR).
  • It is important to note that confirmation of all replicas does not guarantee that all assigned replicas have received the message. By default, when acks=all, an acknowledgement is made as soon as all replicas currently in synchronization (replicas in ISR) receive the message. So Kafka’s delivery commitment can be read as follows: there is no delivery guarantee for messages that have not been successfully committed, and “successfully committed” messages are guaranteed not to be lost if there is at least one surviving, fully synchronized copy in the ISR.
What are the conditions under which a Kafka node is alive?
  • Point 1: a node must maintain a session with ZK through zK’s heartbeat detection
  • Second point: if the node is a slave, that is, a replication node, then it must replicate the leader node not too far behind. Backwardness here can refer to two situations
    • 1: Data replication lags behind. The data on the slave node and the leader node differ greatly. This situation has a disadvantage.
    • 2: Indicates that the time difference between the replication request from the slave to the leader is too large. By configuringreplica.lag.time.maxYou can configure the time parameter. This approach solves the problem caused by the first approach.
How to recover from a Kafka partition failure?

In Kafka there is a Partition recovery mechanism for recovering failed partitions.

Each Partition records a RecoveryPoint on the disk, which is flushed to the maximum offset of the disk. LoadLogs are performed when the broker fails to restart. The Partition’s RecoveryPoint is read first, and the segment containing the Partition’s RecoveryPoint and subsequent segments may not be fully flushed to disk segments. Segment recover is then called to re-read the MSG of each segment and rebuild the index.

Advantages:

  1. Managing Partition data by segment facilitates data life cycle management and makes it easy to delete expired data
  2. To speed up recovery when a program crashes and restarts, simply restore segments that are not fully flushed to disk
What causes the replica to be out of sync with the leader?
  • Slow copy: the follower cannot catch up with the leader within a certain period of time. One of the most common causes is that I/O bottlenecks cause the follower to add replication messages slower than the leader.
  • Stuck copy: the follower stops pulling requests from the leader within a certain period of time. The follower replica is stuck because the GC has stopped or the follower fails or dies.
  • New start replicas: When a user adds a replica factor to a theme, new followers are not in the sync replicas list until they have completely caught up with the leader log.

The followers of a partition lag far enough behind the leader to be considered out of sync with the replica list or in a lagging state.

As mentioned above, there are now two types of kafka decision lag, The replicas lag is determined by the replicas lag behind the leader’s maximum number of messages (replica.lag.max.messages) or replicas lag behind the leader’s maximum waiting time (replica.lag.time.max.ms). The former is used to detect slow copies, while the latter is used to detect dead or dead copies

What if a copy in the ISR dies?
  • There are two options: the service is directly unavailable for a period of time to wait for the replica in the ISR to recover (pray that the replica recovered has data) or the first replica (which may or may not be in the ISR) is directly used as the leader, which is also a tradeoff between availability and consistency.
  • Service unavailable Mode This is used when message loss is not allowed. It is used when consistency is greater than availability and can be done in two ways
    • Set the minimum number of ISR synchronization copies. If the current number of ISR synchronization copies is greater than the minimum value, the partition will accept writes to avoid too few ISR synchronization copies. If less than the minimum, the partition will not receive writes. This minimum setting only takes effect if acks = all.
    • Disable unclean-leader election. When all replicas in the ISR are unavailable, the replica in the OSR cannot be used as the leader. In this case, the replica in the OSR cannot be used as the leader until the replica in the ISR recovers.
  • Choosing the first replica as the leader is appropriate for scenarios where availability is greater than consistency. This is the default for kafka in isR where all replicas dieunclean.leader.election.enableTo prohibit this behavior, take the first approach.
So how does ISR achieve synchronization?

Broker offsets are divided into three types: base offset, High Watemark (HW), and log end offset (LEO).

  • Base offset: the initial displacement, which is the offset of the replica message on the first day
  • HW: High watermark value of replica. Indicates the displacement of the latest submitted message in the replica. The HW value of the leader is also the actual range of the message submitted. Each replica has the HW value, but only the HW in the leader can be used as the identification information. The HW value is updated only after the message has been successfully backed up to the follower replica according to the parameter criteria. This means that the message is not lost in theory and can be considered “committed”.
  • LEO: the offset of the next message to be written in the replica. Note that it is the next message to be written, not the last one. This LEO feeling is used to indicate the follower synchronization progress. So HW represents the location of the data that has been synchronized, LEO represents the latest location that has been written, and only the data before HW is accessible to the outside world. Now take a look at what happened in the black box between the time the broker received the message and the time it returned the response.
  1. The Broker received a request from Producer
  2. The leader receives the message and successfully writes, LEO value +1
  3. The broker pushes the message to the follower replica. The follower successfully writes LEO +1…
  4. After all LEO writes, the leader HW +1
  5. The message can be consumed and successfully responded to

The above process can be seen in the figure below:

After solving the previous problem, how does Kafka choose a leader?

The most common way to elect a leader is through majority voting, such as Redis, but Kafka does not use majority voting. Kafka uses quorum.

Quorum is a voting algorithm commonly used in distributed systems to ensure data consistency through data redundancy. In Kafka, the implementation of this algorithm is the ISR, which is the quorum that can be elected as the leader.

  • When the leader is down, only a new leader can be selected from the ISR list. No matter which copy in the ISR is selected as the new leader, it knows the data before THE HW and can ensure that consumers can continue to see the data submitted before the HW after the leader is switched.
  • Truncation mechanism of HW: A new leader is elected, and the new leader cannot guarantee that all the data of the previous leader has been completely synchronized, but only that the data of the previous HW has been synchronized. At this point, all followers must truncate the data to the position of HW and then synchronize the data with the new leader to ensure data consistency. When the leader recovers and finds that the data in the new leader is inconsistent with the data in its own possession, the leader truncates its own data to the original HW position and synchronizes the data of the new leader. The downtime leader also synchronizes data like the followers to ensure data consistency.

If you feel this article is helpful to you, please click like or follow the blogger, your like and attention will be my biggest motivation to move forward!

Refer: Effectivecoding blog