After the last article, we already know some of the basic concepts of Kafka, but as a mature middleware, Kafka obviously has more than this, some of the design ideas are worth thinking about. In this article, we will briefly list some of them and make more detailed introduction later

1.Kafka is stored on a file system

The first thing we need to know is that Kafka’s messages exist on the file system. The general assumption is that “disks are slow,” and you might be skeptical of Kafka’s adoption of this design. In fact, a good disk design can make it as fast as the Internet. For example, sequential access to disk can be faster than random memory access in some cases, or even close to network speed. Kafka relies heavily on the file system to store and cache messages, which is an important guarantee of Kafka’s high throughput.

The Topic mentioned above is actually a logical concept, facing consumers and producers. Physically, it stores partitions. Each Partition eventually corresponds to a directory where all messages and index files are stored. By default, only one Partition is created for each Topic when the number of partitions is not specified. For example, if I create a Topic named test and do not specify the number of partitions, I will create a test-0 folder by default, where the naming rules are:

.

Any messages published to a Partition are appended to the end of the Partition data file. This sequential write to disk makes Kafka very efficient.

Each message sent to the Broker selects which Partition to store according to Partition rules. If the Partition rules are set properly, all messages can be evenly distributed among different partitions.

2. Underlying storage design in Kafka

If a Kafka cluster has only one Broker, we create two topics named “topic1” and “topic2”, Partition number is 1 and Partition number is 2.

    | --topic1-0    
    | --topic2-0    
    | --topic2-1 
Copy the code

In Kafka, there are multiple partitions under the same Topic. Each Partition is a directory, and each Partition is evenly divided into multiple Segment files of equal size. By default, each Segment is no larger than 1 GB. And it only contains 7 days of data. If the size of the segment reaches 1 gigabyte, the segment is closed and a new segment is opened for writing.

The segments being written are called active segments and are not deleted. Therefore, if the log retention time is set to 1 day, but the log segment contains 5 days’ worth of data, we will actually keep 5 days’ worth of data because the active segment is in use and will not be deleted before closing.

The Segment consists of.index files,.log files, and.timeindex files. Log files are used to store messages, while index and timeindex files are used to retrieve messages.

Now suppose we set each Segment size to 500 MB and start the producer to write a lot of data to topic1. The topic1-0 folder will produce some files like the following:

    | --topic1-0        
    	| --00000000000000000000.index        
    	| --00000000000000000000.log        
    	| --00000000000000368769.index         
    	| --00000000000000368769.log        
    	| --00000000000000737337.index        
    	| --00000000000000737337.log         
    | --topic2-0    
    | --topic2-1 
Copy the code

Segment is the smallest unit of Kafka file storage. The name of each subsequent Segment file is the offset value of the last message in the preceding Segment file. Values are up to 64 bits long, 19 digits long, and no digits are padded with zeros.

Segment File index File index File index File

For example, the metadata <3, 497> in the index file represents the third message in the data file (368769 + 3 = 368772 message in the global Partition) and the physical offset address of the message is 497.

Notice the index file is not from zero beginning, also is not each increment of 1, this is because the Kafka sparse indexes are stored, every byte of data must be set up an index, it avoids the index files take up too much space, which can keep the index file in memory, reduced the query time of disk IO overhead, At the same time, it does not bring much time consumption to the query. However, the disadvantage is that messages that are not indexed cannot be located in the data file at the same time, so a sequential scan is required, but the range of this sequential scan is very small.

The file name is the offset of the last message in the previous Segment, so when we need to find a message with the offset specified, we can find the Segment to which it belongs by binary search in the filename of all segments. Find its physical location on the file in its index file and retrieve the message.

The sequential disk I/O storage design is important for Kafka’s high performance because messages are read and written sequentially in Partition Segment data files and are not deleted after consumption (deletion policy is for expired Segment files).

How does Kafka know exactly what message offset is? This is because Kafka defines a standard data storage structure, where each message in a Partition contains three properties:

  • Offset: indicates the offset of Message in the current Partition. It is a logical value that uniquely identifies a message in the Partition and can be simply considered as an ID.
  • MessageSize: indicates the size of the message content data.
  • Data: Specifies the content of the message

3. Kafka cluster

Members of the management

Kafka uses Zookeeper to manage cluster membership. Each broker has a unique ID (either specified in a configuration file or automatically generated), and the corresponding temporary node is registered with Zookeeper when the broker is started. If the same ID exists in the cluster, the new broker will fail to start.

The nodes in Zookeeper are registered at/Broker/IDS. Kafka components listen for changes under this path and are notified when the broker joins or leaves. When a node leaves (possibly due to outage, network failure, long GC, etc.), the corresponding node in Zookeeper disappears, but the broker ID is still present in some data structure. For example, the list of replicas of each topic contains the broker ID of the replica, so if a broker leaves and a new broker has the same ID, the new broker replaces the previous broker in the cluster and is assigned the same theme and partition.

Controller

The cluster controller is also a broker that, in addition to its general broker duties, is responsible for the primary copy of the elected partition (mentioned in point 4 below). The first broker in the cluster becomes a controller by creating a temporary node under Zookeeper’s /controller path. When other brokers start, they try to create a temporary node, but receive a “node already exists” exception to know that the cluster controller exists. These brokers listen for the temporary node of Zookeeper’s controller. When the controller fails and the temporary node disappears, they are notified and attempt to create the temporary node as a new controller.

For a controller, if it detects a broker leaving the cluster, it checks to see if the broker has a primary copy of the partition, and if so, elects a new primary copy for those partitions. The controller selects a new master copy from the partition’s list of replicas and sends requests to the new master copy and other followers. This way, the new master copy knows that producer and consumer requests need to be handled, while the followers need to synchronize messages to the new master copy.

If the controller detects that a new broker (which may have been down before) has joined, it checks the broker’s ID to see if a partition copy exists and notifies the broker to synchronize messages to the corresponding partition master copy.

Finally, every time a new controller is elected, a new and larger controller epoch is generated, so that brokers in the cluster check this timestamp when they receive messages from the controller and ignore them to avoid a split brain when they find messages from the old controller.

4. Replica

Kafka achieves high availability through replication, ensuring that the cluster remains available even if a small number of nodes fail. As mentioned earlier, each topic has several partitions, and each partition has multiple copies, all of which reside in the Broker. There are two types:

  • Leader Replica: Each zone has a unique leader replica. All producer and consumer requests are processed by the leader replica to ensure consistency.
  • Follower replicas: The other replicas in the partition are follower replicas. Follower replicas do not process producer or consumer requests. They only synchronize messages to the master replica.

For the master replica, it also has the responsibility of keeping an eye on the synchronization status of the follower replica. Each follower copy remains synchronized with the master copy, but the follower’s state may fail to be synchronized under abnormal conditions (such as network congestion, machine failure, restart, and so on). The follower replica synchronizes (like the consumer) by sending a Fetch request to the master replica containing the next message shift that it wants to synchronize, which is always ordered. For example, a follower might request message 1, message 2, message 3 in order… If the follower requests message N, the master replica can be sure that the follower has received n-1 and previous messages. Therefore, based on the displacement information in the request, the master copy knows how far behind the follower is. If the copy does not send a synchronization request for more than 10 seconds (via parameter Settings) or if the requested displacement is 10 seconds earlier, the master and slave will consider the follower to be out of sync. If a follower copy is synchronously backward, the follower cannot become the new master copy if the master copy fails. Follower replicas that can be synchronized in time are in-sync and are eligible to become the new master replica.

In addition to the primary copy, each partition has a preferred Leader, which is the primary copy when the topic is initially created. When a topic is initially created, Kafka uses an algorithm to split the master and follower copies of all topics, so backing up a master copy usually ensures that traffic in the cluster is evenly distributed. If the backup master copy is in the in-sync state, it will automatically become the new master copy if the master copy fails.

3. Producer design

The usual questions for message queue producers are: Is every message critical and can’t be lost? Is it ok to repeat messages once in a while? Are we concerned about message latency or the throughput of writing messages? A credit card transaction processing system, for example, sends a message to Kafka when a transaction occurs, and another service reads the message to check if the transaction is approved and returns the result through Kafka. For such a business, messages should neither be lost nor repeated, throughput needs to be as high as possible due to the high volume of transactions, and latency can be slightly higher; For example, if you need to collect data on user clicks on a web page, a small number of lost or repeated messages can be tolerated, and the amount of latency is not important as long as it does not affect the user experience. Throughput can be determined by the number of real-time users.

Different services require different write modes and configurations. We can discuss the details later, but let’s take a look at the basic process for producers to write messages:

The process is as follows:

  1. First, we need to create a ProducerRecord. This object needs to contain the topic and value of the message, optionally specifying a key or partition.
  2. When sending a message, the producer serializes the key and value into a byte array and sends it to the partitioner.
  3. If we specify a partition, then the allocator returns that partition; Otherwise, the allocator will select a partition based on the key value and return it.
  4. Once the partition is selected, the producer knows which topic and partition the message belongs to, and it adds this record to the batch messages of the same topic and partition, which another thread is responsible for sending to the corresponding Kafka Broker.
  5. When the broker receives a message, it returns a RecordMetadata object containing the message’s subject, partition, and displacement if it is successfully written, otherwise an exception is returned.
  6. Once the producer receives the result, it may retry the exception.

4. Consumer design

Consumers and consumer groups

As mentioned above, consumers work as consumer groups, collaborating to consume messages on the same topic. Each consumer receives a message for a different partition. Suppose you have a T1 topic that has four partitions; We also have a consumer group, G1, which has only one consumer, C1. Then consumer C1 will receive a message for the four partitions as follows:

If we add a new consumer C2 to consumer group G1, then each consumer will receive a message for two separate partitions, as follows:

If increased to four consumers, then each consumer will receive a message for one partition, as shown below:

But if we continue to add consumers to this consumer group, the remaining consumers will be idle and will not receive any messages:

To sum up, it is generally recommended to create topics with a large number of partitions so that more consumers can be added to improve performance under high consumption loads. In addition, the number of consumers should not be more than the number of districts, because the extra consumers are idle and not helpful.

Another important feature of Kafka is the ability to write a message once, allowing any number of applications to read the message. In other words, each application can read the full amount of messages. In order for each application to be able to read the full amount of messages, the application needs to have different consumer groups. For the example above, if we add a new consumer group, G2, that has two consumers, it looks like this:

In this scenario, consumer group G1 and consumer group G2 both receive full messages for T1 topics and logically belong to different applications.

Weight balance

As you can see, when a new consumer joins a consumer group, it consumes one or more partitions that were previously the responsibility of other consumers. In addition, when a consumer leaves a consumer group (such as a reboot, outage, etc.), the partitions it consumes are allocated to other partitions. This phenomenon is called rebalancing. Rebalancing is ensured by high availability and horizontal scaling. ** However, none of the consumers can consume messages during rebalancing, thus making the entire consumer group temporarily unavailable. ** Also, rebalancing partitions can cause the original consumer state to expire, causing consumers to renew their state, which can also reduce consumption performance.

A consumer keeps alive within a consumer group by periodically sending a hearbeat to a broker that acts as a group coordinator. The broker is not fixed, and each consumer group may be different. When a consumer pulls a message or submits, a heartbeat is sent.

If the consumer does not send a heartbeat for more than a certain amount of time, its session expires, the group coordinator assumes that the consumer is down and triggers a rebalance. As you can see, there is a certain amount of time between the time the consumer is down and the time the session expires. During this time, the consumer’s partition cannot consume messages. In general, we can do a graceful shutdown so that the consumer sends an away message to the group coordinator, so that the group coordinator can rebalance immediately without waiting for the session to expire.

Partition and consumption model

Consider the following two questions:

  • In Kafka, messages in a topic are broken up and distributed among multiple partitions. The Consumer Group needs to fetch messages from different partitions to consume. How do we reconstruct the order of messages in a topic?

  • Messages in a Partition can be consumed multiple times by different Consumer groups. When were messages consumed in the Partition deleted? How does a Partition know where a Consumer Group is currently consuming?

The answer to the first question, unfortunately, is no. Kafka only ensures that messages are ordered within a Partition, regardless of the global situation.

The second problem, which has already been addressed in the introduction, is that partitions never delete messages, whether or not they are consumed, unless the message expires or the data volume reaches its upper limit. As for the location of consumption, in the early version, the consumer maintained the offset of consumption in ZooKeeper, and the consumer reported it every time at an interval, which was easy to lead to repeated consumption, and ZooKeeper was not suitable for frequent write operations in large quantities! In the new version, offsets consumed by consumers are maintained directly in the __consumer_offsets topic of the Kafk cluster!

Why the Pull model

Should the consumer ask the Broker for data (pull) or should the Broker push data (push) to the consumer? As a messaging system, Kafka follows the traditional pattern of having producers push messages to the broker and consumers pull messages from the broker.

The **push pattern is difficult to accommodate consumers with different consumption rates, because message sending rates are determined by the broker. The goal of the **push mode is to deliver messages as quickly as possible, but this can easily cause consumers to fail to process messages, typically in the form of denial of service and network congestion. The Pull pattern consumes messages at an appropriate rate based on the Consumer’s ability to consume.

For Kafka, the pull mode is more appropriate. The Pull pattern simplifies broker design and allows consumers to control the rate at which messages are consumed, as well as the way they consume them — either in bulk or on a piece-by-piece basis — and to choose different delivery methods to implement different transport semantics.

5. How does Kafka ensure reliability

Reliability in Kafka is guaranteed by four points:

  • For a partition, its messages are ordered. If A producer writes message A and then message B to A partition, the consumer reads message A and then message B.
  • A message is not considered committed until all copies of the in-sync state are written. The writes here might just be written to the file system cache, not necessarily flushed to disk. Producers can wait for confirmation at different times, such as waiting for the primary copy of the partition to be written before returning, or waiting for all in-sync state copies to be written before returning.
  • Once the message has been committed, the data is not lost as long as one copy survives.
  • Consumers can only read committed messages.

With these basic guarantees, we can build a reliable system, but we need to consider how reliable our application needs to be. Reliability is not free. It is all about system availability, throughput, latency, and hardware price. Therefore, we often need to make trade-offs, and the pursuit of reliability is not practical.