It takes about 30 minutes to read this article. This article has a lot of dry material. I hope you can read it patiently.

Hello, I am Hua zai, in this 1024 programmers special festival, and we meet again.

From this article, I will be a special knowledge of Kafka in-depth analysis, today I will talk about Kafka storage system architecture design, speaking of the storage system, you may be familiar with MySQL, also know that MySQL is based on B+ tree as its index data structure.

What is Kafka based on? Why is it designed this way? What problem does it solve? And how to solve it? What kind of technology is used in it?

With these questions in mind, let’s talk to you about the deep thinking and implementation behind Kafka’s storage architecture.

After reading this article, I’m sure you’ll have a better understanding of the Kafka storage architecture. You can also have ideas to compare the architecture of other storage systems.

Kafka storage scenario analysis

Before we go into Kafka’s storage solutions, let’s take a look at Kafka’s definitions:

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

Translated into Chinese:

Apache Kafka is an open source distributed event flow processing platform used by thousands of companies for high-performance data pipeline flow analysis, data integration, and mission-critical applications.

Veteran drivers of Kafka know that it was hatched from within Linkedin. From the beginning, Kafka was built to address real-time log streams of big data, processing hundreds of billions of logs a day. The characteristics of log flow mainly include 1) real-time data generation and 2) massive data storage and processing. Therefore, it is bound to face the challenges of high concurrency, high availability and high performance of distributed systems.

From the above background, it can be concluded that any talk of architecture design without business scenarios is a hooligan

To sum up, the storage requirements for Kafka are as follows:

  1. The message flow is stored primarily (it can be in simple text format or other formats, and for Broker storage, it does not care about the data itself)
  2. To support efficient storage and high persistence of massive data (ensure that data is not lost after restart)
  3. To support efficient retrieval of massive data (efficient query and processing can be done through offset or timestamp when consuming)
  4. To ensure data security and stability, failover fault tolerance

Kafka storage selection

With that in mind, let’s take a look at what Kafka is based on. Can we implement it directly with relational databases that we already know about? Let’s move on to deeper analysis.

2.1. Store basic knowledge

Let’s first understand the basic knowledge or common sense of storage. In our cognition, the speed of each storage medium is roughly the same as shown in the following figure. The higher the level, the faster the speed. Obviously, disks are in an awkward position, however, disks can actually be faster or slower than we expect, depending on how we use them.

As for disk and memory I/O speeds, we can see from the following performance test results that the sequential I/O performance of ordinary mechanical disks is 53.2M values/s, while the random I/O performance of memory is 36.7M values/s. The conclusion seems to be that sequential DISK I/O performance is better than random MEMORY I/O performance.

In addition, in terms of the overall data read and write performance, there are different ways to improve either the read speed or the write speed.

  1. Improve read speed: Use indexes to improve query speed, but with indexes, a large number of write operations will maintain the index, which will reduce write efficiency. Common such as relational database: mysql and so on
  2. Improve write speed: this is generally the use of log storage, through the sequential write to increase the speed of writing, because there is no index, can not quickly query, the most serious can only be read line by line traversal. Common fields such as big data are basically implemented in this way.

2.2 Analysis of Kafka storage scheme

Above from the storage basic knowledge, as well as storage medium IO speed, read and write performance analysis of the storage class system implementation, so we look at Kafka storage in the end which way to implement it?

In the case of Kafka, which is mainly used to process massive data streams, the main features of this scenario include:

  1. Write operation: Write concurrency requirements are very high, basically up to millions of TPS, sequential appending log, no need to consider the update operation
  2. Read operation: Compared with write operation, it is relatively simple, as long as it can query efficiently according to certain rules (offset or timestamp).

According to the above analysis, for write operations, the sequential apend log mode can meet Kafka’s requirement of millions of TPS write efficiency. But how do you efficiently query these logs? Is it possible to directly use the MySQL B+ tree data structure for storage? Let’s break it down one by one:

If you use a B+ tree index structure for storage, you need to maintain the index on every write, you need to have extra space to store the index, and you need to do some of the “page splitting” operations that are common in a relational database. These are too heavy for a high concurrency system like Kafka.

But among database indexes, there seems to be one that seems to fit this scenario perfectly: Hash index [Hash Table based implementation]. In order to improve the read speed, we only need to maintain a mapping relationship in memory. Every time we query the message according to the Offset, we get the Offset from the Hash Table and then read the file to quickly locate the location of the data to be read. However, hash indexes usually require permanent memory, which is impractical for Kafka to write millions of messages per second, and can easily overwhelm memory and cause oom.

At this time, we can imagine that the Offset of the message is designed as an ordered field, so that the message can be stored in the log file orderly, and there is no need to introduce additional hash table structure, we can directly divide the message into several blocks. For each block, we only need to index the Offset of the first message of the current block. This is sort of a binary search algorithm. That is, first find the corresponding block according to the size of Offset, and then search from the block in order. As shown below:

This allows you to quickly locate the message you are looking for. In Kafka, we call this index structure a “sparse index.”

Kafka storage architecture design

Above from Kafka birth background, storage scenario analysis, storage medium IO comparison, as well as Kafka storage scheme selection and other aspects of in-depth analysis, the final Kafka storage implementation scheme, that is, ** based on sequential add-on log + sparse hash index.

Let’s take a look at the Kafka log storage structure:

As you can see from the image above, Kafka is based on a “topic + partition + copy + segment + index” structure:

1. In Kafka, messages are grouped by Topic, which is a logical concept. In fact, disk storage is based on Partition, that is, each Topic is divided into multiple partitions. The number of Partition partitions can be specified at Topic creation time.

Partition is designed to solve the problem of horizontal scaling of Kafka storage. If all messages in a Topic are stored on a single Kafka Broker, it would be very difficult for Kafka to write millions of messages per second in a highly concurrent system. The Broker is bound to be bottlenecked and cannot be recovered in the event of a failure, so Kafka divides Topic messages into multiple partitions and distributes them evenly across the entire Kafka Broker cluster.

Each message in a Partition is assigned a unique message ID, known as Offset. Therefore, Kafka only guarantees internal order within each Partition, but not global order.

4. Then, each Partition is divided into multiple logsegments, to prevent Log logs from becoming too large, Kafka introduces the concept of logsegments, and shred logs into multiple logseinterfaces. This is equivalent to a large file being split evenly into relatively small files, which is also easy to find, maintain, and clean up messages. In this way, when doing historical data cleaning, directly delete the old logseinterface files.

5. Log logs are physically stored only in the form of folders, and each logseinterface corresponds to one Log file, two index files, and possibly other files (such as the snapshot index files with the suffix “.snapshot”) on the disk.

You can also refer to the storage mechanism section of Kafka Basics, which is also explained in detail.

Kafka log system architecture design

After understanding Kafka storage selection and storage architecture design, we will analyze the architecture design of Kafka logging system in depth.

Kafka messages are grouped by Topic. Each Topic is logically independent, and each Topic can be divided into one or more partitions. Each message will be appended to the specified partition according to partition rules when it is sent, as shown in the following figure:

4.1. Log Directory Layout

So what is the log directory layout for Kafka messages written to disk? Veteran Kafka drivers know that logs correspond to a folder named topic-partition. For example, suppose we now have a topic named “topic-order” with four partitions, So the actual physical storage is represented as “topic-order-0”, “topic-order-1”, “topic-order-2”, “topic-order-3”.

If we look at the figure above, we know that the first messages that are written to Log are written sequentially. However, only the last log interface can perform write operations, and all the other log interfaces cannot perform write operations. To understand this concept, we’ll call the last logseinterface “activeSegement”, which represents the currently active log fragmentation. As messages are continuously written, a new activeSegement needs to be created when the activeSegement meets certain conditions. Additional messages are written into the new activeSegement.

For efficient message retrieval, the log files in each LogSegment (suffix “.log “) have corresponding index files: Offset index file (.index), timestamp index file (.timeindex), and snapshot index file (.snapshot). Each LogSegment has an Offset as its baseOffset, which represents the Offset of the first message in the current LogSegment. The offset is a 64-bit Long integer, and the log files and index files are named after the baseOffset, which is a fixed 20-digit number, with zeros in front of any missing digits. For example, the base offset of the first LogSegment is 0 and the corresponding log file is 00000000000000000000.log.

For example, writing a certain amount of messages to the topic topic-order, the layout in the topic-order-0 directory at one point looks like this:

The offset of the first message in the LogSegment is 12768089. The current LogSegment contains 12768089 messages (messages with offsets ranging from 0 to 12768089).

Note that each LogSegment contains not only.log,.index, and.timeIndex files. It may also contain files such as. Snapshot,. Txnindex, leader-epoch-checkpoint, and temporary files such as. Deleted,. Cleaned, and. Swap.

In addition, the Consumer saves the submitted shifts in the __consumer_offsets theme inside Kafka. If you don’t know about this, you can refer to the section on the submitted shifts in Kafka Consumer. Let’s look at an overall log directory structure:

4.2 Log format evolution

For a mature messaging middleware, log format not only affects the extension of functionality, but also the optimization of performance dimensions. As Kafka continues to evolve, its log format is evolving. Kafka’s log format has gone through three major versions: V0, V1, and V2.

We know that inside a Kafka Partition is composed of every message, and if the log format is not well designed, its functionality and performance can be compromised.

4.2.1 V0 version

Versions of Kafka prior to 0.10.0 used this version of the log format. In this version, each message corresponds to an Offset and message size. Offset indicates its Offset in a Partition. Message size indicates the size of the message. The two together total 12B and are known as log headers. The log header and Record as a whole are viewed as one message. As shown below:

1. Crc32 (4B) : indicates the crC32 check value. The verification range is between Magic and value.

2. Magic (1B) : indicates the log format version number. The magic value of this version is 0.

3. Attributes (1B) : Attributes of the message. The lower three bits indicate the compression type: 0 indicates NONE, 1 indicates GZIP, 2 indicates SNAPPY, and 3 indicates LZ4 (introduced from Kafka 0.9.x). The other bits are reserved.

Key length (4B) : indicates the key length of the message. If the value is -1, the key is not set.

5. Key: This field is optional.

6. Value Length (4B) : indicates the actual length of the message body. If -1, the message is empty.

Value: indicates the message body.

As you can see from the figure above, V0 has a minimum size of 14 bytes. Kafka considers messages smaller than 14 bytes to be illegal.

The value of each field in the message is as follows:

  • CRC: the value after CRC calculation of the message;
  • Magic: 0;
  • Attribute: 0x00 (no compression is used);
  • Key length: 5;
  • Key: hello;
  • Value Length: 5.
  • Value: the world.

The length of the message is: 4 + 1 + 1 + 4 + 5 + 4 + 5 = 24 bytes.

4.2.2 version V1

As The Kafka version develops iteratively, users find that Kafka cannot determine the time of messages based on the time of messages in V0 because the log format does not store time information. Therefore, only the change time of log files can be used to clear logs, which may be deleted by mistake.

The version of the log format used from V0.10.0 to V0.11.0 is V1, with a timestamp field indicating the timestamp of the message. As shown below:

V1 has an 8B timestamp field more than V0. Then the function of timestamp field is as follows: internal: affects log saving and sharding strategies; External: Affects function extensions such as message auditing and end-to-end delay

As you can see from the figure above, the minimum size of a V1 message is 22 bytes. Kafka considers a message smaller than 22 bytes to be an invalid message.

In total, it is 8 bytes larger than the V0 message, and if the V0 example message is used, the total number of bytes in V1 is: 24 + 8 = 32 bytes.

4.2.3 Design defects of V0 and V1 versions

Through the above analysis and drawing of V0 and V1 log formats, we can find certain flaws in their design, such as:

  1. Low space usage: Regardless of whether the key or value exists, a fixed size of 4 bytes is required to store their length information. When there are enough messages, a lot of storage space will be wasted.

  2. Message length is not saved: It is inefficient to calculate the total size of each message in real time.

  3. Save only the latest message displacement.

  4. Redundant CRC check: Even if messages are sent in batches, the CRC needs to be saved separately for each message.

4.2.4 V2 Version

Kafka reconstructs the log format in 0.11.0.0, using variable-length types to address low space usage, adding total message length fields, and storing time and displacement increments. And some fields are uniformly extracted into RecordBatch.

As can be seen from the above figure, the RecordBatch of V2 has the following changes compared with V0 and V1:

1. The CRC value is removed from the message and extracted into the message batch.

2. Information such as Procuder ID, Producer epoch and serial number are added to support idempotency and transaction messages.

3. Use incremental form to store timestamp and displacement.

4. The minimum size of the message batch is 61 bytes, which is much larger than V0 or V1. However, in the scenario of sending messages in batches, the sending efficiency is improved and space usage is reduced.

To sum up, the log format of V2 uses variable length to improve the space usage of the message format, and extracts some fields to the RecordBatch. In addition, the RecordBatch can store multiple messages, greatly saving disk space when sending messages in batches.

4.2.5 Log Clearing mechanism

Kafka stores messages to a disk. As data is written to the disk, the disk space becomes larger and larger. In order to control the disk space, messages need to be cleaned up. In the above analysis of Kafka Log storage structure, each Replica corresponds to a Log. The Log can be divided into multiple Log segments, which is convenient for Kafka to clean logs.

Kafka provides two log cleaning strategies:

1. Log Retention: Deletes invalid Log segments based on certain Retention policies.

2. Log Compaction: When a message’s key is consolidated, only the last version of the message remains when different values have the same key.

The Kafka Broker parameter log.cleanup.policy is used to set the log cleanup policy. The default value is delete. If you want to use a log compression cleanup policy, you need to set log.cleanup.policy to “compact”, which is not enough, you must also set log.cleaner.enable (default true) to true.

If you want to support both cleanup policies, you can simply set the log.cleanup.policy parameter to delete, compact.

4.3.1 Deleting Logs

Time based strategy

Kafka’s LogManager has a dedicated log cleaning task that periodically detects and deletes log segments that do not meet the criteria. Here we can Kafka Broker the parameters of the retention. Check. Interval. Ms to configuration, the default value is 300000, 5 minutes.

There are three retention strategies in Kafka:

The log deletion task periodically checks whether the retention time of the current log file exceeds the specified threshold (retentionMs) to find the set of log segment files that can be deleted.

RetentionMs can be determined by the size of these parameters at the Kafka Broker end

Ms > log.retention. Minutes > log.retention. Hours Priority. By default, only log.retention.

Note here: Deleting the expired log segment file is not simply calculated according to the modification time of the log segment file, but according to the maximum timestamp largestTimeStamp in the log segment. First, the timestamp index file corresponding to the log segment should be queried to find the last index data of the timestamp index file. The value is used if the timestamp value is greater than 0, otherwise lastModifiedTime is used.

[Delete steps] :

  1. The segments to be deleted are first removed from the skip list of the segments maintained by the Log object to ensure that there are no threads left to read the segments.
  2. Add the suffix.deleted to all files corresponding to the log segment, including index files.
  3. A delayed task named “delete-file” was assigned to delete the files with the suffix “.deleted.” The command is executed once every minute by default. You can run file.delete.delay.ms to configure the command.

Log size based policy

The log deletion task periodically checks whether the current log size exceeds the specified threshold (retentionSize) to find the set of log segment files that can be deleted.

RetentionSize can be set using the Kafka Broker log.retention. Bytes parameter. The default value is -1, which is infinite.

Note here that log.retention. Bytes sets the size of all log files in the log, not the size of individual log segments. A single log segment can be set using the log.segment.bytes parameter. The default size is 1 gb.

[Delete steps] :

  1. The difference between the total log file Size and retentionSize is calculated, that is, the total log Size to be deleted.

  2. Then start from the first log segment in the log file to find the file set of deletable log segments (deletableSegments)

  3. Once you find it, you can delete it.

The policy is based on whether the baseOffset of the next log segment is less than or equal to logStartOffset. If yes, you can delete the log segment.

[Delete steps as shown in the figure below] :

  1. Each log segment is first traversed from the beginning. The starting offset of the next log segment of log segment 1 is 20, which is less than the size of logStartOffset. Add log segment 1 to deletableSegments.

  2. The starting offset of the next log offset of log segment 2 is 35, also less than the size of logStartOffset. Add the log segment 2 page to deletableSegments.

  3. The starting offset of the next log offset of log segment 3 is 50, also less than the size of logStartOffset. Add the log segment 3 page to deletableSegments.

  4. The next log offset of log segment 4 is compared to the right of logStartOffset, so all log segments starting from log segment 4 will not be added to deletableSegments.

  5. After collecting all deletable log sets, you can delete them directly.

4.3.2 Log Compression

Log Compaction Log Compaction rewrites only the last version of a Compaction that occurs when the key has the same value. If the application only cares about the latest value of a key, you can enable the log clearing function for Kafka. Kafka periodically merges messages with the same key and retains only the latest value.

Log Compaction can be analogous to the persistence model of an RDB in Redis. You can imagine a scenario where Kafka is stored every time a message changes. If Kafka crashes at some point and you want to recover quickly, you can use a log compression strategy. In this way, only the latest data needs to be recovered.

4.3 Disk Data Storage

We know that Kafka relies on the file system to store and cache messages, as well as the typical sequential apportion of logging operations. In addition, it uses the operating system PageCache to reduce disk I/O operations, which cache disk data into memory and convert disk access into memory access.

In Kafka, PageCache is used extensively, which is one of the important factors for Kafka to achieve high throughput. When a process is preparing to read the contents of a file on disk, the operating system first checks whether the data page to be read is in PageCache. If it is hit, the data will be returned directly. Thus avoiding disk I/O operations; If there is no hit, the operating system makes a read request to disk and stores the read data pages into PageCache, which then returns the data to the process. Similarly, if a process needs to write data to disk, the operating system checks whether the data page is in the PageCache, adds the corresponding data page to the PageCache, and finally writes the data to the corresponding data page. The modified data pages become dirty pages, and the operating system writes the data in the dirty pages to disk at an appropriate time to maintain data consistency.

In addition to sequential logging and PageCache, Kafka uses zero-copy technology to further improve system performance, as shown in the following figure:

You can also see the high performance section of Kafka’s three-high architecture design anatomy written earlier.

The overall process of messages from production to writing to disk is shown below:

Five, the summary

This article starts from Kafka storage scenario analysis, Kafka storage selection analysis and comparison, and then Kafka storage architecture design analysis, and Kafka log system architecture design details in-depth analysis, step by step with you unveiled the mystery of Kafka storage architecture.

If my article is of any help to you, please pay attention to it, like it, read it and forward it. Thank you very much!

Below do a publicity for themselves, pay attention to my public number: Hua Zai chat technology adhere to the summary, continue to output high-quality articles

It’s enough to figure out Kafka’s storage architecture

Creation is not easy, your support and recognition, is the biggest motivation for my creation, we will see you in the next article!