What is a Partition

Kafka divides messages within a Topic into “segments” that are stored in different brokers. Kafka calls each of these segments a Partition

Why, one might ask, does Kafka fragment messages within a Topic into different brokers? Since messages in a Topic may be too many to fit on a single machine, they need to be split into multiple pieces and stored on different machines, which also speeds up read and write performance

What is the Replication

It is not safe to store only one Partition. For data security and high availability, Kafka can have multiple partitions stored in different brokers. Kafka calls each Partition Replication

Replication is logically a copy of a Partition, essentially the same as a Partition, and a Topic message.

We say there are three copies of a file, so there should be four files in total, but in Kafka we say there are three replicas of a Partition, so there are only three data points in total. This is a bit different from what we normally understand, because in Kafka Replication is an instance of a real external service!

Therefore, when there is only one Replication in a Partition, it can be understood as the same thing. When there are multiple Replicas in a Partition, the relationship between the two is more like that between a class and an instance. A Partition is the class of the Partition. Replication is an instance of the partition class

To save storage space, the default kafka policy is to have only one Replication

Two roles for Replication

Although a Partition has multiple copies, Kafka specifies that only one copy can provide services. This copy is called Leader Replication by Kafka, and the other copies are called Follower Replication

Data synchronization between Replication

Producer and Consumer only interact with the Leader Replication, while the other Follower Replication synchronizes data from the Leader Replication.

The synchronization between followers and the Leader is delayed and affected by the following two parameters:

  • replica.lag.time.max.ms: Synchronization copy lags andleaderTime of copy
  • zookeeper.session.timeout.ms:borkerzookeeperSession timeout

in-sync Replica

Follower Replication must synchronize messages from the Leader in a timely manner and not “fall too far behind”. “Too far behind” here means that the number of messages sent by Follower Replication to the Leader exceeds the threshold or the followers do not send the fetch request to the Leader for a certain period of time.

The Leader keeps track of a Replication list, called in-Sync Replica(ISR), with which a degree of synchronization is maintained. If a Follower crashes, or falls too far behind, the Leader will remove it from the ISR.

A message from the Leader is considered to have been submitted only if it has been accepted by all followers in the ISR, and only if it has been submitted will it be consumed by consumers. However, for write performance, followers return an ACK to the Leader as soon as they receive data, rather than wait until the data is written to the Log. Therefore, Kafka can only ensure that committed messages are stored in the memory of all followers in the ISR, not persisted to disk

Expansion of in-sync Replica

Kafka has a task called ISR-expiration that periodically checks whether each partition needs to shrink its ISR set. This period is half the value of the replica.lag.time.max.ms parameter. When followers more than up. Lag. Time. Max. Ms millisecond not fetch request or, the message behind Leader rerplica. Lag. Max. Article messages, will be removed from the list of ISR

If the number of followers Replication LEO in the out-of-sync Replica(OSR) collection is greater than or equal to the High Watermark(LW) of the partition, It is added to the In-Sync Replica(ISR) collection

Related Configurations

# Follower maximum request interval rerica. Lag.time.max. ms=10000Copy the code

If the Leader finds that the Follower has not fetched it for more than 10 seconds, it will be removed from the ISR list by the Leader

# followers message behind the maximum number of article, beyond which a few followers will be removed from the list of ISR rerplica. Lag. Max. Messages = 4000Copy the code
Replication min.insync.replicas=1Copy the code

Out-Sync Relipca

Out-of-sync Relipcas(OSR) corresponds to in-sync Replica(ISR), which is a set of out-of-sync replicas or a list of backward replicas

Assigned Repllica

Assigned Relipcas(AR) refers to all copies in the partition, AR = ISR + OSR

Several elections for Leader

If the Leader Replication fails, a new Leader needs to be selected from the followers Replication

Synchronous list election

Kafka is preferentially selected from the ISR because Follower Replication and Leader are synchronized in the ISR, thus achieving the highest data consistency.

Note that data consistency is the highest, rather than data consistency, because the synchronization between the followers and the Leader is delayed, and data replication is completed asynchronously. So data can be lost, but the probability and quantity are very small

It is important to always have a sufficient number of synchronized copies. To promote follower to Leader, it must exist in the list of synchronized replicas. Each partition has a list of synchronized replicas, which are updated by the Leader partition and Controller.

The process of selecting a leader from an ISR is called the Clean Leader election

Asynchronous list elections

Because the ISR is dynamically adjusted, the ISR list may be empty. In this case, you can select the ISR list from outside the ISR.

In general, asynchronous replicas lag far behind the Leader, so it is highly likely that data will be lost if these replicas are chosen as the new Leader.

The process for selecting a leader in an unsynchronized replica is called unclean Leader election

So whether to open the unclean leader election, can pass the Broker side parameter unclean. Leader. Election. Enable to control, of course also according to your specific business to balance!

Enabling unclean Leader election may result in data loss, but the upside is that it keeps the leader in existence without stopping services externally, thus improving high availability. On the other hand, disabling unclean leader election has the benefit of maintaining data consistency and avoiding message loss, but at the expense of high availability. This is what the CAP theory of distributed systems says.

The roles of Leader and follower are switched

In addition to the two elections mentioned above that cause the Leader role to move, there is another situation that also causes the Leader role to move.

When the Controller Broker senses from Zookeeper that a new Broker has been added to the cluster, it uses the Broker ID to check whether Replication exists on the Broker, and if so, The Controller then informs the newly added Broker to synchronize the existing messages corresponding to the leader to its own follower partition. Then, to ensure load balancing, the Controller goes offline from the current leader partition and elects the follower partition on the newly added Broker as the new Leader partition.

This is the role swap between Leader and follower, which occurs when a new Broker joins the cluster to balance the load between partitions

Changing the Leader once has a cost. All the requests from the original Leader partition are routed to the new Leader partition. If you do not want the Leader/follower roles to switch, set this parameter to false in the configuration file

Leader Replication Load balancing

Because KafKa clients only interact with the Leader Replication when reading or writing partitions of a Topic, having too many Leader Replication in one Broker can cause the Broker to become overstressed.

Partition configuration

Configure the default number of copies for topic

default.replication.factor=3
Copy the code

If the specified number of copies is not displayed when creating a topic, the default number of copies is used

When creating a topic, you can use –replication-factor to display the specified number of copies

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --partitions 1 --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
Copy the code