Kafka was originally developed by LinkedIn as a distributed messaging system that became part of Apache. Written in Scala, Kafka is widely used for its horizontal scalability and high throughput. More and more open source distributed processing systems such as Cloudera, Apache Storm and Spark support Kafka integration.

Kafka differs from traditional messaging systems in the following ways:

  • It is designed as a distributed system, easy to scale out;
  • It provides high throughput for both publish and subscribe;
  • It supports multiple subscribers and automatically balances consumers when it fails;
  • It persists messages to disk and can therefore be used for bulk consumption, such as ETL, as well as real-time applications.

Kafka Architecture

The overall architecture of distributed message queue system based on Kafka-ZooKeeper is as follows:

The basic concept

In the Kafka architecture, there are a few terms:

  • Producer: producers push messages into brokers (servers) in Kafka clusters.

  • A Kafka cluster consists of multiple Kafka instances (servers). Each instance constitutes a single Kafka Broker.

  • Topic: The messages pushed by producer to a Kafka cluster are grouped into topics. This is essentially a logical concept, and the objects are producers and consumers. The producer only needs to focus on which Topic to push the messages. Consumers only need to care about which Topic they subscribe to;

  • Partition: Each Topic is divided into multiple Partitions, i.e. physical Partitions; For load balancing purposes, Partitions for the same Topic are stored on multiple brokers in a Kafka cluster. To improve reliability, replicas in Kafka can be used to set the number of Partitions to be backed up. As shown in the block diagram above, each partition has two backups;

  • Consumer: a Consumer that pulls and consumes messages from brokers in a Kafka cluster.

  • Consumer group: In the high-level Consumer API, each consumer belongs to a consumer-group, and each message can only be consumed by one consumer in the consumer-group. But it can be consumed by multiple consumer-groups;

  • Replicas: ensures the high availability of a partition.

  • Leader: a role in replicas. Producer and consumer only interact with the leader.

  • Follower: a role in replicas that copies data from the leader and serves as a replica. If the leader dies, a new leader will be elected from its followers.

  • Controller: one of the servers in the Kafka cluster, used for leader election and various failover;

  • ZooKeeper: Kafka uses ZooKeeper to store meta information about a cluster

Topic & Partition

A topic can be thought of as a class of messages, and each topic will be divided into multiple partitions, each of which is an Append log file at the storage level. Any messages published to this partition are appended to the end of the log file, and each message’s position in the file is called offset. Offset is a long number that uniquely identifies a message. Each message is appended to a partition and is written sequentially, which is very efficient (sequential writes are more efficient than random writes, which is an important guarantee of Kafka’s high throughput).

Why does Kafka partition topics?

In short: load balancing + horizontal scaling.

Each message sent to the broker selects which partition to store according to partition rules. If the partition rule is set properly, all messages can be evenly distributed among different partitions, thus achieving horizontal scaling. (If a topic corresponds to a file, the machine I/O of that file becomes a performance bottleneck for that topic, and partition solves this problem.) Can when creating the topic in the $KAFKA_HOME/config/server. The properties specified in the partition number (shown below), can, of course, after the topic create to modify the number of partition.

num.partitions=3
Copy the code

When sending a message, the producer can specify the key of the message, and the producer determines which partition the message is sent to based on the key and partition mechanism. Partition mechanism can be specified by the producer of partition. This parameter to specify the class, the class must implement the kafka. Producer. The Partitioner interface.

Kafka file storage mechanism

A topic can be divided into several partitions. In fact, partition is not the final storage granularity, partition can also be subdivided into segments, a partition is physically composed of multiple segments

For the sake of illustration, assume that there is only one Kafka cluster, and that the cluster has only one Kafka broker, that is, only one physical machine. Dirs =/ TMP /kafka-logs to set the Kafka message file storage directory. At the same time, create a topic: mytopic_test, with the number of partitions set to 4 (see the command for creating topic in the previous lesson). After that, in the/TMP /kafka-logs directory, you can see that four directories are generated:

drwxr-xr-x 2 root root 4096 Apr 15 13:21 mytopic_test-0 drwxr-xr-x 2 root root 4096 Apr 15 13:21 mytopic_test-1 Drwxr-xr-x 2 root root 4096 Apr 15 13:21 mytopic_test-2 drwxr-xr-x 2 root root 4096 Apr 15 13:21 mytopic_test-3 In file storage, there are multiple partitions under the same topic. Each partiton is a directory. The partition name rule is as follows: Topic name + ordered ordinal number, the first ordinal number starts at 0, and the largest ordinal number is the number of partitions minus 1. Partition is a physical concept, while topic is a logical concept.

Why cannot partition be used as the storage unit?

If a partition is the smallest storage unit, Kafka Producer sending messages will inevitably cause partition files to expand indefinitely, which will have a serious impact on the maintenance of message files and the cleaning up of consumed messages. The partition needs to be further subdivided by segment. Each partition is equal to a huge file evenly divided into multiple data files of equal size (the number of messages in each segment file is not necessarily the same). This feature also facilitates the deletion of old segments, i.e. it is convenient to clean up the messages that have been consumed. Improve disk utilization. The segment lifecycle is determined by server configuration parameters (log.segment.bytes, log.roll.{ms,hours}, etc.).

How the segment works

The segment file consists of a. Index file and a. Log file, representing the segment index file and data file respectively. The command rules of the two files are as follows: The segment file name is the offset value of the last segment message in the preceding segment file. The value is 64 bits long and 20 digits long. No digits are filled with zeros as follows:

00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log
Copy the code

Use segment: 00000000000000170410 as an example to show the mapping between the. Index file and the. Log file.

How do I find a message from a partition by offset?

Pictured above, reading offset = 170418 message, the first lookup segment files, including 00000000000000000000. The index for the first file, The second file is 00000000000000170410. The index (starting offset of 170410 + 1 = 170411), and the third file is 00000000000000239430. The index (starting offset of 239430 + 1 = 239431), So offset=170418 falls into the second file. Other subsequent files can be similarly named and arranged by their offsets, and the binary search method can be used to quickly locate the specific file location. Secondly, according to 00000000000000170410. The index file position [5] 8132 to 00000000000000170410. The location of the log file in 1325 to read.

If the message offset=170418 is read from position 1325 in 00000000000000170410.log, how do you know when the message is finished? (Otherwise you’ll be reading the next message)

This problem is solved by the physical structure of the message, which has a fixed physical structure, including: Offset (8 Bytes), message body size (4 Bytes), CRc32 (4 Bytes), Magic (1 Byte), Attributes (1 Byte), key Length (4 Bytes), and key (K) Bytes), payload (N Bytes), and other fields determine the size of a message.

Partition copy synchronization and election

Each partition in Kafka has a write-ahead log file. Partitions can be subdivided into several segments, but for upper-layer applications, A partition can still be thought of as the smallest unit of storage (a “giant” file with multiple segment files spliced together). Each partition consists of an ordered, immutable series of messages appended to the partition continuously.

There are two new terms in the image above: HW and LEO. LEO is an abbreviation for LogEndOffset, which indicates the position of the last Message in the log of each partition. HW, short for HighWatermark, is the location of the partition that a consumer can see, a concept that involves multiple copies, mentioned here and discussed later.

To improve message reliability, each Kafka partition has N replicas, where N (greater than or equal to 1) is the number of replica fators in the topic. Kafka implements automatic failover through a multi-copy mechanism that ensures service availability in the event of a broker failure in a Kafka cluster. For a partition, one replica is the leader and the other replicas are followers. The leader processes all read and write requests from the partition. Followers passively copy the data on the leader. As shown in the figure below, a Kafka cluster has four brokers, a topic has three partitions, and the replication factor, i.e. the number of copies, is 3:

How is a leader selected in Replicas

A partition has multiple replicas. To improve reliability, these replicas are distributed among different brokers. Due to factors such as bandwidth, read/write performance, and network latency, the states of these replicas are usually inconsistent at the same time: That is, the followers are not in the same state as the leader. So, how to ensure that the newly elected leader is preferred? In Kafka, the leader maintains and tracks an IN-Sync Replicas (ISR) list. The Replicas In the list are In the same state as the Leader. If the new leader is selected from the replicas in the ISR list, then the new leader is guaranteed to be preferred. Of course, this isn’t the only strategy, as we’ll see below.

Synchronize the ISR copy

Although duplicates greatly enhance availability, the number of duplicates has an impact on Kafka throughput. Usually in order to ensure the reliability of the news, production environment, usually its value (by the parameters of the broker offsets. Topic. Replication. Factor specified size set to greater than 1, for example, 3. All replicas are collectively called Assigned Replicas, or AR. ISR is a subset of AR. The LEADER maintains the ISR list, and the follower synchronizes data from the leader with some delay (the timeout threshold is set by the replica.lag.time.max.ms parameter). If the number of followers exceeds the threshold, they are removed from the ISR and stored in the Outof-Sync Replicas (OSR) list. New followers are also stored in the OSR. AR = ISR + OSR.

Note: ISR includes leader + Followers who synchronize with the leader.

Another concept covered in the previous section is HW. HW is commonly known as HighWatermark, short for HighWatermark. The smallest LEO in the ISR corresponding to a partition is taken as HW, and a consumer can only consume the location of HW at most. In addition, each replica has HW, and the leader and follower are responsible for updating their own HW status. For a new message written by the leader, the consumer cannot consume it immediately. The leader will wait for the message to be synchronized with replicas in all ISR replicas to update the HW, and then the message can be consumed by the consumer. This ensures that if the leader broker fails, the message can still be retrieved from the newly elected Leader. There are no HW restrictions on read requests from the internal broker.

The following figure details the flow of ISR, HW and LEO as producer produces messages to brokers:

Thus, Kafka’s replication mechanism is neither fully synchronous nor purely asynchronous. In fact, synchronous replication requires that all working followers have copied before the message is committed. This type of replication is limited to the slowest follower, which greatly affects throughput. In asynchronous replication mode, the followers asynchronously copy data from the leader, and the data is considered to be committed as long as the log is written by the leader. In this case, if the followers have not finished copying and fall behind the leader, the leader breaks down suddenly. Data is lost and reliability deteriorates. Kafka’s ISR strategy strikes a good balance between reliability and throughput.

Kafka uses the mechanism of isRS, which are active replicas (even if the synchronization is too slow, the synchronization is kicked out of the ISR and into the OSR), so synchronization is very fast

Data reliability and persistence assurance

When the producer sends data to the leader, the request. Required. Acks parameter can be used to set the data reliability level:

request.required.acks = 1
Copy the code

This is the default: Producer sends data to the Leader. The leader writes local logs successfully and returns to the client successfully. At this point, the other replicas in the ISR have not had time to pull the message, and if the leader goes down, the message sent will be lost.

request.required.acks = 0
Copy the code

The producer keeps sending data to the leader without any feedback from the leader. In this case, the data transmission efficiency is the highest, but the data reliability is the lowest. Data may be lost during sending or when the Leader is down.

Request.required. Acks = -1 (all)Copy the code

The producer sends data to the leader. After receiving the data, the leader returns a success message to the producer until all copies in the ISR list complete data synchronization (strong consistency). If no success message is received, the producer considers that the data fails to be sent and automatically resends the data. This is the most reliable solution, but of course performance will suffer.

** Note: the min.insync.replicas parameter is **

Request. Required. Acks =-1 and min.insync.replicas are required to maximize data reliability. The min.insync.replicas parameter sets the minimum number of replicas in the ISR. The default value is 1. This parameter takes effect only when the request.required. When replications of ISR is less than min. Insync. When the number of the replicas, the client will return anomalies: org.apache.kafka.com mon. Errors. NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required. If min.insync.replicas is set to 2 and the actual number of replicas in the ISR is 1 (only the leader), the reliability cannot be guaranteed. In this case, the write request from the client is rejected to prevent message loss.

Read the HW mechanism in depth

Consider a scenario where acks=-1, some ISR replicas complete synchronization, and the leader hangs, as shown below: Follower1 synchronizes messages 4 and 5, and Follower2 synchronizes messages 4, and At the same time, Follower2 is elected as the leader. What should be done to deal with the extra message 5 in Follower1?

This is where HW coordination comes in. As mentioned earlier, in the ISR list of a partition, the LEADER’s HW is the LEO of the smallest copy in the ISR list. Similar to the barrel principle, the water level depends on the lowest short board.

If the failed follower recovers, it first truncates its log file to the HW position at the last Checkpointed time, and then synchronizes messages from the leader. If the leader fails, a new election will be held, and the new leader will send “instructions” to the rest of the followers to truncate to their own HW position before pulling new messages.

When the LEO of each copy in the ISR is inconsistent, if the leader fails at this time, the new leader is elected in accordance with the order in the ISR rather than the level of LEO.

Leader election

To ensure reliability, any message is considered submitted only if it has been copied from the leader by all followers in the ISR, and the message is returned to the producer. In this way, data loss can be avoided when some data is written to the leader but not copied by any followers. For the producer, it can choose whether or not to wait for the message commit, which can be set with the request.required. Acks parameter. This mechanism ensures that a committed message will not be lost as long as there are one or more followers in the ISR.

Question 1: How to avoid throughput degradation while ensuring reliability?

A very important problem is how to elect a new leader from the followers when the leader crashes. Since the followers may fall far behind or crash directly, it is necessary to ensure that the “latest” follower is selected as the new leader. As a basic principle, if the leader fails, the new leader must have all the messages that the original leader has committed. Isn’t that a characteristic of replicas in ISR?

However, there is a question of how large an ISR list should be. In other words, how many followers does the leader have to wait for a message to be confirmed before it is committed? The more followers there are waiting for the leader, the more they have to synchronize with the leader, which leads to higher reliability, but this also leads to lower throughput.

The principle of majority voting

A common strategy for electing a leader is “majority rule,” but Kafka doesn’t use that method. In this mode, if there are 2F +1 replicas, it is necessary to ensure that F +1 replicas have replicated messages before commit. In addition, to ensure that a new leader can be correctly elected, the number of failed replicas cannot exceed F. One big advantage of this approach is that the latency of the system depends on the fastest machines, which means that if the number of copies is 3, the latency will depend on the fastest follower rather than the slowest.

The “minority follows the majority” strategy also has some disadvantages. In order to ensure the normal progress of the leader election, it can tolerate a relatively small number of failed followers. To tolerate the failure of one follower, at least three copies are required. To allow 2 followers to die, there must be more than 5 copies. In other words, in the production environment, a large number of copies are necessary to ensure a high fault tolerance rate, and a large number of copies will lead to a sharp decline in performance under a large amount of data. This algorithm is more commonly used in systems with shared cluster configurations such as ZooKeeper than in systems that require large amounts of data.

What is Kafka’s strategy for electing a leader?

In fact, there are many algorithms for leader election, such as ZooKeeper’s Zab, Raft, and Viewstamped Replication. Kafka’s leader election algorithm is more like Microsoft’s PacificA algorithm.

In ZooKeeper, Kafka dynamically maintains an ISR for each partition. All replicas in the ISR synchronize with the leader. Only members in the ISR can be selected as the leader. Unclean. Leader. Election. Enable = false). In this mode, for f+1 replicas, a Kafka topic can tolerate the failure of F replicas without losing committed messages, which is advantageous in most usage scenarios. In fact, any message is considered submitted only if it has been copied from the leader by all followers in the ISR, and the information is returned to the producer to ensure reliability. Unlike the “majority rule” policy, the number of copies in a Kafka ISR list does not need to exceed half of the total number of copies.

Leader election strategy in extreme case

As mentioned above, Kafka ensures that committed messages are not lost when there is at least one follower in an ISR (including the leader). However, if all replicas of a partition are down, data loss cannot be guaranteed. How does the leader election take place in this case? There are usually two options:

Wait for any replica in the ISR to recover and select it as the leader. Select the first recovered replica (not necessarily in the ISR) as the leader. How to choose? This is a choice between usability and consistency. If the replica in the ISR must wait for recovery, it may be unavailable for a relatively long time. And if all replicas in the ISR cannot be recovered or data is lost, the partition will never be available.

The first recovered replica is selected as the leader. If the replica is not a replica in the ISR, it may not have all the committed messages, resulting in message loss. By default, Kafka adopts the second strategy, namely unclean. Leader. Election. Enable = true, this parameter can be set to false to enable the first strategy.

Unclean. Leader. Election. Enable this parameter for leader election and availability of the system and the reliability of data has a crucial impact. Careful trade-offs should be made in a production environment.

Record the consumption progress Offset

When a consumer consumes messages of a specified message partition, the consumption progress Offset of the partition message needs to be recorded on ZooKeeper periodically. So that after the consumer restarts or another consumer takes over the message consumption of the message partition, message consumption can continue from the previous progress. Offset is recorded by a special node in ZooKeeper. The node path is:

The content of the node is the value of Offset. / consumers / [group_id] / offsets / [topic] / [broker_id – partition_id] PS: Kafka has recommended that the consumer Offset information be saved in a Topic within Kafka, namely:

__consumer_offsets(/brokers/topics/__consumer_offsets) and the kafka_consumer_groups.sh script is provided by default for viewing consumer information (command: Sh — bootstrap-server * — describe — group *) In current versions, offset is stored either in a local file or on the broker side, depending on the configuration of offset.store.method. The default is on the broker side.

Producer-kafka-consumer

Producer releases news

The producer uses push mode to publish messages to the broker, and each message is appended to a partition, which is a sequential write disk (sequential write disks are more efficient than random write memory, ensuring kafka throughput). When producer sends a message to the broker, it selects which partition to store it to based on the partitioning algorithm.

Its routing mechanism is as follows:

If patition is specified, it is used directly. Specify a key without specifying a patition, and hash the key to select a patition. Neither a patition nor a key is specified. Polling is used to select a patition. Write process:

ZooKeeper’s “/brokers/… The /state” node finds the leader of the partition. Producer sends messages to the leader. The leader writes the message to the local log; The followers write the leader pull message to the local log and the leader sends an ACK. After receiving the REPLICA ACK in all ISR, the leader adds HW (high watermark, and finally commit offset) and sends ACK to the producer.

Broker stores messages

Physically divide a topic into one or more patitions, and each patition corresponds to a physical folder that stores all messages and index files for that patition.

Consumer news

The high-level Consumer API provides the semantics of the Consumer group. A message can only be consumed by one consumer in the group, and the consumer does not pay attention to offset when consuming the message. The last offset is saved by ZooKeeper (the next consumer in the group will consume from where the offset was recorded).

Note:

If the number of consuming threads is greater than the number of patitions, some threads will not receive messages; If the number of patitions is greater than the number of consuming threads, some threads receive more than one patition message. If a thread consumes multiple patitions, there is no guarantee of the order in which you receive messages, whereas messages within a patition are ordered. The Consumer uses the pull mode to read data from the broker.

The push pattern is difficult to accommodate consumers with different consumption rates, because the message sending rate is determined by the broker. The goal is to deliver messages as quickly as possible, but it is easy for consumers to fail to process messages, typically through denial of service and network congestion. The Pull pattern consumes messages at an appropriate rate based on the consumer’s ability to consume.

For Kafka, the pull pattern is more appropriate. It simplifies broker design, allows consumers to control the rate at which messages are consumed, and allows consumers to control how they consume them — either in batches or in pieces. At the same time, different delivery methods can be selected to achieve different transport semantics.