1 overview

Originally developed by LinkedIn as a distributed messaging system and later part of Apache, Kakfa is written in Scala and widely used for its horizontal scalability and high throughput. More and more open source distributed processing systems such as Cloudera, Apache Storm and Spark support Kafka integration.

With its own advantages, Kafka is increasingly favored by Internet enterprises. Vipshop also uses Kafka as one of its internal core message engines. As a commercial level messaging middleware, The importance of message reliability can be imagined. How to ensure accurate transmission of messages? How do I ensure that messages are stored accurately? How do YOU ensure correct consumption of messages? These are all things to consider. This paper starts from Kafka architecture, first understand the basic principle of Kafka, and then through the kakfa storage mechanism, replication principle, synchronization principle, reliability and persistence guarantee step by step on its reliability analysis, finally through benchmark to enhance the recognition of Kafka high reliability.

2 Kafka architecture

As shown in the figure above, a typical Kafka architecture consists of producers (server logs, business data, page views generated at the front of a page, and so on), brokers (Kafka supports horizontal scaling. The more brokers Kafka has, the higher the cluster throughput), and producers (producers). Several Consumer (groups), and a Zookeeper cluster. Kafka uses Zookeeper to manage cluster configuration, elect the leader, and rebalance when the Consumer group changes. Producer uses push mode to publish messages to the broker, and Consumer uses pull mode to subscribe to and consume messages from the broker.

Noun explanation:

2.1 the Topic & Partition

A topic can be thought of as a class of messages, and each topic will be divided into multiple partitions, each of which is an Append log file at the storage level. Any messages published to this partition are appended to the end of the log file, and each message’s position in the file is called offset. Offset is a long number that uniquely identifies a message. Each message is appended to a partition and is written sequentially, which is very efficient (sequential writes are more efficient than random writes, which is an important guarantee of Kafka’s high throughput).

Each message sent to the broker selects which partition to store according to partition rules. If the partition rule is set properly, all messages can be evenly distributed among different partitions, thus achieving horizontal scaling. (If a topic corresponds to a file, the machine I/O of that file becomes a performance bottleneck for that topic, and partition solves this problem.) Can when creating the topic in the $KAFKA_HOME/config/server. The properties specified in the partition number (shown below), can, of course, after the topic create to modify the number of partition.

1

2

3

4

# The default number of log partitions per topic. More partitions allow greater

# parallelism for consumption, but this will also result in more files across

# the brokers.

num.partitions=3

When sending a message, the producer can specify the key of the message, and the producer determines which partition the message is sent to based on the key and partition mechanism. Partition mechanism can be specified by the producer of partition. This parameter to specify the class, the class must implement the kafka. Producer. The Partitioner interface.

For more details on topics and partitions, see the section “Kafka File Storage Mechanism” below.

3 High reliability storage analysis

The guarantee of Kafka’s high reliability comes from its robust replication policy. By tuning the parameters of its replicas, Kafka can operate with ease between performance and reliability. Kafka from 0.8 x version began offering partition level of replication, the number of replication may be in the $KAFKA_HOME/config/server is configured in the properties (the default. The replication. Refactor).

Here from the Kafka file storage mechanism, from the bottom to understand the details of Kafka storage, and then its storage has a micro cognition. Then the macro level concept is explained through the Kafka replication principle and synchronization mode. Finally, from ISR, HW, leader election and data reliability and persistence assurance and other dimensions to enrich the knowledge of Kafka related knowledge.

3.1 Kafka File Storage mechanism

Messages in Kafka are categorized by topics, which are used by producers to send messages to Kafka brokers, and by consumers to read data. On the physical level, however, topics can be grouped into partitions. A topic can be divided into several partitions. How are topics and partitions stored? Partition can also be subdivided into segments. A partition is physically composed of multiple segments. What are these segments? Let’s find out.

For the sake of illustration, assume that there is only one Kafka cluster, and that the cluster has only one Kafka broker, that is, only one physical machine. The Kafka broker configuration ($KAFKA_HOME/config/server. The properties of the dirs = / TMP/Kafka – logs, in order to set the Kafka message file storage directory, at the same time create a topic: Topic_zzh_test, $KAFKA_HOME/bin/kafka-topics. Sh — create — zookeeper localhost:2181 — PARTITIONS 4 — topic topic_vMS_test – the replication – factor 4). / TMP /kafka-logs/TMP /kafka-logs/TMP /kafka-logs

1

2

3

4

drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-0

drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-1

drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-2

drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-3

In Kafka, there are multiple partitions under the same topic. Each partition is a directory. The partition name rules are as follows: Topic name + ordered ordinal number, the first ordinal number starts at 0, and the largest ordinal number is the number of partitions minus 1. Partition is a physical concept, while topic is a logical concept.

Partition can also be divided into segments. Using partitions as the smallest unit of storage, we can imagine that Kafka Producer sending messages will inevitably cause partition files to expand indefinitely, which will have a serious impact on the maintenance of message files and the cleanup of messages that have already been consumed. So partition is subdivided by segment. Each partition is equal to a huge file evenly divided into multiple data files of equal size (the number of messages in each segment file is not necessarily the same). This feature also facilitates the deletion of old segments, i.e. it is convenient to clean up the messages that have been consumed. Improve disk utilization. The segment lifecycle is determined by server configuration parameters (log.segment.bytes, log.roll.{ms,hours}, etc.).

The segment file consists of a. Index file and a. Log file, representing the segment index file and data file respectively. Each segment file is named as the offset value of the last segment message from the previous segment file. The value is 64 bits long and 20 digits long. No digits are filled with zeros, as follows:

1

2

3

4

5

6

00000000000000000000.index

00000000000000000000.log

00000000000000170410.index

00000000000000170410.log

00000000000000239430.index

00000000000000239430.log

Use segment: 00000000000000170410 as an example to show the mapping between the. Index file and the. Log file.

As shown in the figure above, the index file “.index “stores a large amount of metadata, and the data file”.log “stores a large amount of messages. The metadata in the index file points to the physical offset address of Message in the corresponding data file. For example, metadata [3, 348] in the index file is displayed in the. Index file. The third message is displayed in the. Log file, that is, the message 170410+3=170413 in the global partition.

How do you find a message from a partition by offset?

Pictured above, reading offset = 170418 message, the first lookup segment files, including 00000000000000000000. The index for the first file, The second file is 00000000000000170410. The index (starting offset of 170410 + 1 = 170411), and the third file is 00000000000000239430. The index (starting offset of 239430 + 1 = 239431), So offset=170418 falls into the second file. Other subsequent files can be similarly named and arranged by their actual offsets, and the binary search method can be used to quickly locate the specific file location. Secondly, according to 00000000000000170410. The index file position [5] 8132 to 00000000000000170410. The location of the log file in 1325 to read.

If you read a message at offset=170418 from position 1325 in 00000000000000170410.log, how do you know when to read this message or move on to the next message?

This is related to the physical structure of the message, which has a fixed physical structure, including: Offset (8 Bytes), message body size (4 Bytes), CRc32 (4 Bytes), Magic (1 Byte), Attributes (1 Byte), key Length (4 Bytes), and key (K) Bytes), payload(N Bytes), and other fields determine the size of a message.

3.2 Replication Principle and Synchronization Mode

A topic in Kafka has a write-ahead log file for each partition, although partitions can be further subdivided into several segments. However, for upper-layer applications, a partition can be regarded as a minimal storage unit (a “huge” file with multiple segment files). Each partition consists of a series of ordered, immutable messages, which are continuously appended to the partition.

There are two new terms in the image above: HW and LEO. LEO is an abbreviation for LogEndOffset, which indicates the position of the last Message in the log of each partition. HW is short for HighWatermark, which is the location of the partition that a consumer can see. This concept involves multiple copies, which is mentioned here and discussed in the following section.

To improve message reliability, each Kafka partition has N replicas, where N(greater than or equal to 1) is the number of replica fators in the topic. Kafka implements automatic failover through a multi-copy mechanism that guarantees service availability in the event of a broker failure in a Kafka cluster. In The N Replicas, one replica is the leader and the other replicas are followers. The leader processes all read and write requests to the partition. Followers passively copy data from the leader at regular intervals.

As shown in the figure below, a Kafka cluster has four brokers, a topic has three partitions, and the replication factor, i.e. the number of copies, is 3:

Kafka provides a data replication algorithm that guarantees that if the leader fails or fails, a new leader is elected and messages are successfully written to the receiving client. Kafka ensures that one replica is elected from the list of synchronized replicas as the leader, or that the followers catch up with the leader data. The leader is responsible for maintaining and tracking the lagging status of all followers In the IN-Sync Replicas (see the following section) ISR. After the producer sends a message to the broker, the leader writes the message and copies it to all followers. The message is successfully copied to all synchronized copies after it is committed. Message replication latency is limited by the slowest follower. It is important to detect slow copies quickly. If a follower “falls too far behind” or fails, the leader will remove it from the ISR.

3.3 the ISR

In the previous section, we talked about the IN-sync Replicas (ISR), which is the replica synchronization queue. The number of copies has an impact on Kafka throughput, but greatly enhances availability. By default, the number of Replicas in Kafka is 1. That is, each partition has a unique leader. To ensure the reliability of messages, Usually used in its value (by the parameters of the broker offsets. Topic. Replication. Factor specified size set to greater than 1, for example, 3. All replicas are collectively called Assigned Replicas, or AR. ISR is a subset of AR. The LEADER maintains the ISR list. The followers have some delays in synchronizing data from the leader (including the delay time replica.lag.time.max.ms and the number of delays replica.lag.max.messages), X only supports replica.lag.time.max.ms.). Any follower that exceeds the threshold is removed from the ISR and stored in the OUTof-Sync Replicas (OSR) list. New followers are also stored in the OSR first. AR = ISR + OSR.

The replica.lag.max.messages parameter is removed in the 0.0.x version of Kafka, and only replica.lag.time.max.ms is retained as the replica management parameter in the ISR. Why do you do that? Messages Indicates that the number of messages sent by a replica exceeds the value of this parameter. If the number of messages sent by a replica exceeds the value of this parameter, the leader will delete followers from the ISR. Suppose replica.lag.max.messages=4, then if the number of messages sent to the broker by the producer is less than 4 at a time, because the follower copy starts pulling the messages after the leader receives the messages sent by the producer, The number of followers behind the leader is no more than 4 messages, so no followers move out of the ISR, so the setting of replica.lag.max.message at this time seems reasonable. However, when the producer initiates the instantaneous peak flow and the number of messages sent by the producer exceeds four, that is, more than replica.lag.max.messages, the followers are considered to be out of sync with the leader copy and thus kicked out of the ISR. But the followers are actually alive and have no performance problems. Then it catches up with the leader and is rejoined in the ISR. So what happens is they keep picking out the ISR and then going back to the ISR, which adds unnecessary performance loss. And this parameter is global to the broker. Setting too large affects the removal of really “behind” followers; The Settings are too small, resulting in frequent comings and goings of followers. Messages cannot be given a suitable value for replica.lag.max.messages, so the new version of Kafka removes this parameter.

Note: ISR includes leader and follower.

Another concept covered in the previous section is HW. HW is commonly known as HighWatermark, short for HighWatermark. The smallest LEO in the ISR corresponding to a partition is taken as HW, and a consumer can only consume the location of HW at most. In addition, each replica has HW, and the leader and follower are responsible for updating their own HW status. For a new message written by the leader, the consumer cannot consume it immediately. The leader will wait for the message to be synchronized with replicas in all ISR replicas to update the HW, and then the message can be consumed by the consumer. This ensures that if the leader broker fails, the message can still be retrieved from the newly elected Leader. There are no HW restrictions on read requests from the internal broKer.

The following figure details the flow of ISR, HW and LEO as producer produces messages to brokers:

Thus, Kafka’s replication mechanism is neither fully synchronous nor purely asynchronous. In fact, synchronous replication requires all working followers to copy before the message is committed. This replication greatly affects throughput. In asynchronous replication mode, the followers asynchronously copy data from the leader, and the data is considered to have been committed once the log is written by the leader. In this case, if the followers have not completed the replication and fall behind the leader, the leader breaks down suddenly, and the data is lost. Kafka uses ISR in a balanced way to ensure data loss and throughput.

Kafka ISR management is ultimately fed back to the Zookeeper node. Location: /brokers/topics/[topic]/partitions/[partition]/state. There are currently two places where the Zookeeper node is maintained:

  1. Controller to maintain: One of the brokers in a Kafka cluster is elected as a Controller, responsible for Partition management and replica state management, as well as performing administrative tasks such as redistributing partitions. Under certain conditions, the LeaderSelector under Controller elects a new leader, the ISR and the new Leader_EPOCH and Controller_EPOCH are written to the related nodes of Zookeeper. Also initiate LeaderAndIsrRequest to notify all Replicas.
  2. Maintenance by the leader: The leader has a separate thread to periodically check whether the followers in the ISR deviate from the ISR. If the ISR changes, the new ISR information is returned to the related node of Zookeeper.

3.4 Data reliability and persistence

When the producer sends data to the leader, the request. Required. Acks parameter can be used to set the data reliability level:

  • 1 (default) : This means that the producer’s leader in the ISR sends the next message after receiving data successfully and receiving confirmation. If the leader goes down, data is lost.
  • 0: This means that the producer does not wait for confirmation from the broker before sending the next batch of messages. In this case, the data transfer efficiency is the highest, but the data reliability is the lowest.
  • -1: The producer must wait for all followers in the ISR to confirm receiving data before sending data at a time. This ensures the highest reliability. However, this does not guarantee data loss. For example, when there are only leaders in an ISR (as described in the previous ISR section, members in an ISR may increase or decrease due to certain circumstances, and there is only one leader at least), the situation becomes acks=1.

Request. Required. Acks =-1 and the min.insync.replicas parameter (which can be set at the broker or topic level) should be used to maximize data reliability. Min.insync.replicas Specifies the minimum number of replicas in the ISR. The default value is 1. This parameter takes effect only when the request.required. If replications of ISR is less than min. Insync. When the number of the replicas, the client will return anomalies: org.apache.kafka.com mon. Errors. NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required.

Acks =1 and -1

1. request.required.acks=1

The producer sends data to the leader. The leader writes the local log successfully and returns the data to the client successfully. The leader breaks down before the replica in the ISR can pull the message, and the sent message will be lost.

2. request.required.acks=-1

Replicas >=2 and min.insync.replicas>=2, data will not be lost.

There are two typical cases. Acks =-1 (unless otherwise specified, the following acks are represented as request.required. Acks). Data is sent to the leader, and the leader hangs up after all the followers of the ISR complete data synchronization. Data is not lost.

If acks=-1, some ISR replicas are synchronized after data is sent to the leader, and the leader hangs up. For example, both follower1H and Follower2 may become the new leader, and the producer end will get an exception, and the producer end will send data again, and the data may be repeated.

Of course, in the figure above, if no data has been synchronized to The leader and The leader has been elected as the new leader, the message will not repeat.

Note: Kafka only deals with fail/recover problems, not Byzantine problems.

3.5 Further discussion on HW

Consider another case in the figure above (i.e. Acks =-1, partial ISR copy synchronization). If while the Leader is down, Follower1 synchronizes message 4,5, and Follower2 synchronizes message 4, and at the same time follower2 is elected Leader, What should follower1 do with the extra message 5?

This is where HW coordination comes in. As mentioned earlier, in the ISR list of a partition, the LEADER’s HW is the LEO of the smallest copy in the ISR list. Similar to the barrel principle, the water level depends on the lowest short board.

As shown in the figure above, A partition of A topic has three copies, namely A, B, and C. As the leader, A must have the highest LEO, followed by B. Machine C has the slowest synchronization due to its low configuration and poor network. At this time, machine A breaks down, and if B becomes the leader at this time, if there is no HW, the makeFollower operation will be performed after A recovers, and the additional operation will be performed directly after the log file breaks down. If B’s LEO has reached THAT of A, data inconsistency will occur. So use HW to avoid this situation.

During the synchronization operation, A first truncates the log file to its previous HW position, namely 3, and then pulls the message from B for synchronization.

If the failed follower recovers, it first truncates its log file to the HW position at the last Checkpointed time, and then synchronizes messages from the leader. If the leader fails, a new election will be held, and the new leader will send “instructions” to the rest of the followers to truncate to their own HW position before pulling new messages.

When the LEO of each copy in the ISR is inconsistent, if the leader fails at this time, the new leader is elected in accordance with the order in the ISR rather than the level of LEO.

Here I recommend an architecture learning exchange group. Exchange learning group number: 744642380, which will share some senior architects recorded video: Spring, MyBatis, Netty source code analysis, high concurrency, high performance, distributed, microservice architecture principle, JVM performance optimization, distributed architecture and so on to become the architect of the necessary knowledge system. You can also receive free learning resources

A message is considered committed only if all followers in the ISR have copied it from the leader. In this way, some of the data is written to the leader and fails before being copied by any followers, resulting in data loss. As for producer, it can choose whether to wait for the commit message, which can be set via request.required. Acks. This mechanism ensures that a committed message will not be lost as long as there are one or more followers in the ISR.

A very important problem is how to elect a new leader from the followers when the leader crashes. Since the followers may fall far behind or crash directly, it is necessary to ensure that the “latest” follower is selected as the new leader. As a basic rule, if the leader is gone, the new leader must have all the messages from the original Leader Commit. This requires a trade-off. If the leader waits for more followers to confirm a message before it is committed, then more followers can become new leaders after it dies, but this also results in a lower throughput rate.

A popular method of electing a leader is majority rule, and Kafka does not use this method. In this mode, if we have 2F +1 replicas, we must ensure that F +1 replicas complete the replication before commit, and the number of failed replicas should not exceed F to ensure that a new leader can be correctly elected. One big advantage of this approach is that the latency of the system depends on the fastest machines, which means that if the number of copies is 3, the latency will depend on the fastest follower rather than the slowest. The “minority follows the majority” approach also has some disadvantages. In order to ensure the normal progress of the leader election, the number of failed followers it can tolerate is relatively small. To tolerate the failure of one follower, at least three copies are required; to tolerate the failure of two followers, more than five copies must be required. In other words, in the production environment, a large number of copies are necessary to ensure a high fault tolerance rate, and a large number of copies will lead to a sharp decline in performance under a large amount of data. The reason this algorithm is more commonly used in systems with shared cluster configurations such as Zookeeper than in systems that require large amounts of data. HDFS’s HA function is also based on the “minority rule” approach, but its data storage is not in this way.

In fact, there are many algorithms for leader election, such as Zookeeper’s Zab, Raft, and Viewstamped Replication. Kafka’s leader election algorithm is more like Microsoft’s PacificA algorithm.

Kafka dynamically maintains an ISR for each partition in Zookeeper. All replicas in the ISR follow the leader. Only the members in the ISR has been selected as the leader of may (unclean. Leader. Election. Enable = false). In this mode, for f+1 replicas, a Kafka topic can tolerate the failure of F replicas without losing committed messages, which is advantageous in most usage scenarios. In fact, in order to tolerate the failure of F copies, the majority rule approach has the same number of copies as the ISR has to wait for before committing, but the total number of copies required by the ISR is almost half that of the majority rule approach.

As mentioned above, if there is at least one follower in the ISR, Kafka can ensure that data that has been committed is not lost. However, if all replicas of a partition are down, Kafka cannot guarantee data loss. In this case, there are two possible solutions:

  • Wait for any replica in the ISR to “live” and choose it as the leader
  • The first replica that “comes alive” (not necessarily in the ISR) is selected as the leader

This requires a simple choice between usability and consistency. If one must wait for replica in the ISR to “live”, the replica may be unavailable for a relatively long time. And if all replicas in the ISR fail to “live”, or the data is lost, the partition will never be available. If the first replica “alive” is chosen as the leader, and the replica is not the replica in ISR, it will be the leader and the data source of the consumer even though it does not guarantee that it contains all the committed messages. By default, Kafka adopts the second strategy, namely unclean. Leader. Election. Enable = true, this parameter can be set to false to enable the first strategy.

Unclean. Leader. Election. Enable this parameter for leader election and availability of the system and the reliability of data has a crucial impact. Let’s take a look at some typical scenarios.

If the number of replicas in a partition is assumed to be 3, replica-0, Replica-1, and Replica-2 are stored in Broker0, Broker1, and Broker2, respectively. AR = (0), ISR = (0, 1). Set the request. Required., acks = 1 min. Insync. Replicas = 2, unclean. Leader. Election. Enable = false. Broker0 is the original leader and Broker1 is the follower.

  • Broker1 elects the new leader[ISR=(1)] when replica-0 in the ISR crashes, because write is not serving due to min.insync.replicas=2, but Read continues to serve normally. Recovery programmes in such cases:
  1. Try to recover (restart) Replica-0. If it can be recovered, the system is normal.
  2. If replica-0 cannot be restored, set min.insync.replicas to 1 to restore the write function.
  • When replica-0 in ISR crashes, and replica-1 also crashes, then [ISR=(1),leader=-1] cannot provide external services. In this case, recovery scheme:
  1. Try to recover replica-0 and replica-1. If both recover, the system recovers.
  2. If up to 0, and could not rise up – 1, at that time still can’t choose the leader, because when setting unclean. Leader. Election. Enable = false, leader election from ISR, only when all copies are invalid in the ISR, The leader can be elected only after the last faulty secondary instinct in the ISR is recovered, that is, replica-0 fails first, replica-1 fails later, and replica-1 recovers. Conservative proposal put unclean. Leader. Election. The enable is set to true, but it will be a loss of data, this can restore the read service. Also, set min.insync.replicas to 1 to restore write.
  3. Replica-1 is restored but replica-0 is not. The read service is available. Set min.insync.replicas to 1 to restore the write function.
  4. Neither replica-0 nor Replica-1 can be recovered. In this case, refer to scenario 2.
  • When replica-0 and Replica-1 in the ISR are down at the same time, then [ISR=(0,1)] cannot provide external services. In this case, try to recover replica-0 and replica-1. When either copy recovers, read services can be provided externally. Write will not be restored until both replicas are restored, or min.insync.replicas will be set to 1.

3.7 Kafka Sending Mode

The sending mode of Kafka is set by the producer.type parameter on the producer side. This parameter specifies whether messages are sent synchronously or asynchronously in the background thread. The default mode is synchronous, that is, producer.type=sync. If the mode is set to async, i.e., producer.type=async, the producer can push data in batch form, which greatly improves the performance of the broker, but increases the risk of data loss. To ensure message reliability, producer.type must be set to sync.

For asynchronous mode, there are also four supporting parameters, as follows:

Batch pushing data greatly improves processing efficiency. Kafka Producer can send requests as a batch after accumulating a certain number of messages in memory. The number of batches can be controlled by the producer parameter (batch.num. Messages). By increasing the batch size, the number of network requests and disk I/OS can be reduced. However, specific parameter Settings need to be balanced between efficiency and timeliness. In newer versions there is also the batch.size parameter.

4 High reliability Analysis

4.1 Message transmission Guarantee

We’ve seen how Kafka stores efficiently and how producers and consumers work. The next part of the discussion is how Kafka ensures that messages are transmitted between producers and consumers. There are three possible delivery guarantees:

  • A message may be lost, but it is never transmitted again
  • At least once: A message is never lost but may be transmitted repeatedly
  • Exactly once: Every message must be sent once and only once

Kafka’s message transfer guarantee mechanism is straightforward. When a producer sends a message to the broker, once the message is committed, it is not lost due to replication. However, if the producer fails to communicate with the broker due to network problems after sending the message, the producer cannot determine whether the message has been committed. While Kafka cannot determine what happened during a network failure, producer can retry multiple times to ensure that messages have been correctly transmitted to the broker, so Kafka currently operates at least once.

After reading messages from the broker, the Consumer can select Commit, which stores the offset of messages read by the consumer from the partition in Zookeeper. The next time the consumer reads the partition, it will start reading from the next entry. If not committed, the next read will start at the same location as the one since the last commit. Of course, you can also set consumer to autoCommit, which automatically commits as soon as the consumer reads data. Kafka ensures exactly once if only the process of reading the message is discussed, but at least once if the message is repeated for some reason between the producer and the broker.

Consider a situation where the consumer reads the message and commits before processing the message. In this mode, if the consumer crashes before processing the message after the commit, the next time the consumer starts working, it will not be able to read the message that was just committed but not processed. At most once.

After reading the message, process it before committing it. In this mode, if the consumer crashes before the commit, the consumer will be processed again at least once.

To achieve exactly once, you need to introduce message rescheduling.

4.2 Message deduplication

As mentioned in the previous section, Kafka repeats messages on both the producer side and the consumer side, which requires reprocessing.

Kafka documents refer to its Globally Unique Identifier (GUID) concept, which refers to client-side generation algorithms to obtain Unique ids for each message and map them to addresses stored on the broker. It also facilitates idempotent guarantees for the sender. This de-processing module needs to be provided on the broker, which is not supported in current versions.

If the GUID is de-weighted from the perspective of the client, the centralized cache needs to be introduced, which will inevitably increase the dependency complexity, and the cache size is difficult to define.

In addition to Kafka, commercial-grade middleware such as RabbitMQ and RocketMQ only guarantee at least once and cannot de-duplicate messages on their own. Therefore, we recommend that businesses de-process according to their own business characteristics, such as the idempotent nature of the business message itself, or with the help of other products such as Redis.

4.3 High Reliability Configuration

Kafka provides high data redundancy elasticity. For scenarios requiring high data reliability, replicas can be increased by replicas (replicas), min.insync.replicas (replicas), etc. However, this will affect performance. On the other hand, performance increases while reliability decreases. Users need to make trade-offs between their own service features.

To ensure that data is written to Kafka securely and reliably, the following configuration is required:

  • Replication. Factor >=3, i.e. at least 3 copies; 2<=min.insync.replicas<=replication.factor
  • Broker configuration: leader election conditions unclean. Leader. Election. Enable = false
  • Acks =-1(all), producer.type=sync

5 BenchMark

Kafka has a long history in Vipshop. According to the information obtained by the VMS team, the number of topics supported in the Kafka cluster run by THE VMS team has reached nearly 2000, and the daily request volume has reached hundreds of billions. Here to Kafka’s high reliability as a reference point to explore the behavior of several different scenarios, in order to deepen the understanding of Kafka, for everyone in the future efficient use of Kafka to provide a basis.

5.1 Test Environment

Kafka Broker uses 4 machines, which are configured as follows:

  • CPU: 24 core / 2.6 GHZ
  • Memory: 62G
  • Network: 4000Mb
  • OS/kernel: CentOs release 6.6 (Final)
  • Disk: 1089G
  • Kafka version: 0.10.1.0

Broker JVM parameter Settings: -Xmx8G -Xms8G -server -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/apps/service/kafka/bin/.. /logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9999

Client machine configuration:

  • CPU: 24 core / 2.6 GHZ
  • Memory: 3G
  • Network: 1000Mb
  • OS/kernel: CentOs release 6.3 (Final)
  • Disk: 240G

5.2 Test in different scenarios

Scenario 1: Test the impact of different number of replicas, min.insync.replicas, and Request.Required. Acks policy (acks policy for short) on TPS.

Specific configuration: a producer; The sending mode is sync. The message body size is 1kB; The number of partitions is 12. Number of copies: 1/2/4; Min.insync. replicas are 1/2/4; Acks are -1 (all) /1/0 respectively.

Min.insync. replicas is valid only when acks=-1:

Analysis of test results:

  • The acks policy of the client has a great impact on the TPS sent by the client. TPS is as follows: ACKs_0 > ACKs_1 > ACK_ -1.
  • The higher the number of copies, the lower the TPS. When the number of replicas is the same, min.insync.replicas does not affect TPS.
  • When acks=0/1, TPS is only affected by the acks policy, regardless of the min.insync.replicas parameter and the number of replicas.

Set the number of partitions to 1 to further check the impact of different acks policies, min.insync.replicas policies, and different number of replicas on the sending speed. For details, see Scenario 2 and Scenario 3.

Scenario 2: When the number of partitions is fixed to 1, test the impact of different replicas and the min.insync.replicas policy on the sending speed.

Specific configuration: a producer; The sending mode is sync. The message body size is 1kB; The producer side acks = 1 (all). Copies: 2/3/4; Min.insync. replicas is set to 1/2/4.

The test results are as follows:

Analysis of test results: The higher the number of copies, the lower the TPS (which is consistent with the test conclusion of scenario 1), but when the number of partitions is 1, there is little difference. Min.insync.replicas does not affect TPS.

Scenario 3: When the number of partitions is fixed to one, test the impact of acks policies and the number of copies on the sending speed.

Specific configuration: a producer; The sending mode is sync. The message body size is 1kB; Min. Insync. Replicas = 1. Number of topic copies: 1/2/4; Acks: 0/1 / – 1.

The test results are as follows:

Analysis of test results (consistent with Scenario 1) :

  • The more copies, the lower TPS;
  • The acks policy of the client has a great impact on the TPS sent by the client. TPS is as follows: ACKs_0 > ACKs_1 > ACK_ -1.

Scenario 4: Test the impact of different partition numbers on the sending rate

Specific configuration: a producer; The message body size is 1KB; The sending mode is sync. The number of topic copies is 2; Min. Insync. Replicas = 2; Acks = 1. Set the number of partitions to 1, 2/4/8/12.

Test results:

Analysis of test results: Different partitions will affect TPS. With the increase of the number of partitions, TPS will increase, but it is not always proportional. When it reaches a certain critical value, the increase of the number of partitions will make TPS decrease slightly.

Scenario 5: Test the impact on clients and message drop-offs by making some brokers in the cluster unserviceable.

Specific configuration: a producer; Message body size 1KB; The sending mode is sync. The number of topic copies is 4; Min.insync.replicas set to 2; Acks = 1; Retries = 0/100000000; The number of partitions is 12.

Specific test data are shown in the following table:

Error message:

  • Exception error 1: client, some data may fall plate, partial failure: org.apache.kafka.common.errors.Net workException: The server disconnected before a response was received.
  • Mistake 2: [WARN]internals.Sender — Got error produce response with correlation ID 19369 on topic-partition default_channel_replicas_4_1-3, retrying (999999999 attempts left). Error: NETWORK_EXCEPTION
  • Mistake 3: [WARN]internals.Sender — Got error produce response with correlation ID 77890 on topic-partition default_channel_replicas_4_1-8, retrying (999999859 attempts left). Error: NOT_ENOUGH_REPLICAS
  • Mistake 4: [WARN]internals.Sender — Got error produce response with correlation ID 77705 on topic-partition default_channel_replicas_4_1-3, retrying (999999999 attempts left). Error: NOT_ENOUGH_REPLICAS_AFTER_APPEND

Analysis of test results:

  • After killing both brokers, the client can continue sending. As brokers are reduced, the partition’s leaders are distributed over the remaining two brokers, resulting in a decrease in TPS.
  • After killing three brokers, the client could not continue sending. Kafka’s automatic retry function kicks in. When brokers with a number greater than or equal to min.insync.replicas are restored, they can continue sending.
  • When REtries is not 0, the message is repeated. All messages returned by the client are successfully dropped. Some messages can be dropped when exceptions occur.

Scenario 6: Testing the sending latency of a single producer and the end-to-end latency.

Specific configuration: a producer; Message body size 1KB; The sending mode is sync. The number of topic copies is 4; Min.insync.replicas set to 2; Acks = 1; The number of partitions is 12.

Test data and results (unit: MS) :

Test summary of each scenario:

  • When acks=-1, the TPS of the Kafka sender is limited by the number of replicas (in ISR). The more replicas, the lower the TPS.
  • If acks=0, TPS is the highest, followed by 1, and the worst is -1. That is, TPS: ACKs_0 > ACKs_1 > ACK_ -1.
  • The min.insync.replicas parameter does not affect TPS;
  • Different partitions will affect TPS. With the increase of the number of partitions, TPS will increase, but it is not always proportional. When it reaches a certain critical value, the increase of the number of partitions will make TPS decrease slightly.
  • Kafka has high reliability when acks=-1 and min.insync.replicas>=1. All successfully returned messages can be replicated.

Here I recommend an architecture learning exchange group. Exchange learning group number: 744642380, which will share some senior architects recorded video: Spring, MyBatis, Netty source code analysis, high concurrency, high performance, distributed, microservice architecture principle, JVM performance optimization, distributed architecture and so on to become the architect of the necessary knowledge system. You can also receive free learning resources