This article was first published on the public account “Big Data Apprentice”. If you are interested, please search dashujuxuetu or scan the code at the end of this article to follow.

This article describes how data is stored in Kafka. It will help you understand the data expiration mechanism, fault tolerance mechanism, and configuration optimization in Kafka.

The mind map of this paper is as follows:

1. Physical storage overview

Kafka stores all topic data as files, or more accurately, partition copies. The broker has a single parameter called log.dirs. The value of this parameter is a number of comma-separated directories, such as /first/kafka-logs,/second/kafka-logs,/third/kafka-logs, Not to be confused with the logs produced by Kafka itself, the location of those logs is defined in log4j.properties.

2. How do I allocate partitions

None Rack Information

When creating a topic, the number of partitions and copies are specified. Kafka then needs to decide on which brokers these copies should be distributed.

  1. Even, with as many copies as possible on each broker;
  2. Highly available, with multiple copies of the same partition allocated to different brokers as much as possible;
  3. If the broker is configured with rack information, it is also possible to assign copies of each partition to different racks to prevent the failure of one rack from bringing down the entire partition.

For example, suppose we have six brokers with ids 0,1,2,3,4,5. Now we want to create a topic with 3 partitions and 3 replicas. The process is as follows:

  1. The leader of partition 0 is assigned one by one and a random broker is selected for it. Assuming that Broker 3 is selected, the first copy of partition 0 is stored in Broker 4 and the second copy in Broker 5.
  2. Partition 1 is then allocated. Since the leader of partition 0 is on Broker 3, the leader of partition 1 is on Broker 4. Then the two follower copies are on Broker 5 and 0 respectively.
  3. Finally, partition 2 is allocated, with the leader copy on Broker 5 and the two follower copies on Broker 0 and 1 respectively.

Let’s graph it:

Here’s a production example of a topic partition in Kafka-Manager:

As you can see, it’s consistent with the theory.

Rack information

The broker has a parameter called broker.rack, which is a string that configures the name of the rack on which each broker is located. After the broker is configured with this parameter, the basic allocation of partitions remains the same, but the arrangement of brokers is changed. Suppose you now have two racks, six brokers, and a 3-partitioned, 3-replica topic, as shown in the figure below:

Compared with the allocation mode without rack information, only one more step is required: Arrange brokers as close as possible to brokers on different racks. Because different copies of the same partition are assigned to adjacent brokers, this ensures that the copies are distributed on different racks, avoiding the failure of one rack (such as power failure or network failure) and the unavailability of the entire partition. Note: Permutations are logical permutations.

The job of selecting the broker for the partitioned copy is done by the Controller.

Directory selection

Once a partition is assigned to a specific broker, the broker also needs to choose which directory to store the partition’s data in (since log.dirs is typically configured with multiple directories). This selection is simple: It seems simple and crude to store the new partition in the directory that has the fewest partitions. Note that this approach does not take into account either the total size of each directory or the used size of each directory, so there will be an imbalance.

File management

The data for each partition is stored in a separate directory named topic name – number of partitions. For example, iamabug is the 0th partition of the topic named iamabug-0. In this directory, partition data is stored, but partition data is not stored in a file, but distributed in multiple segment files.

Kafka is a message queue, it is not a permanent storage system, the data is expiration time, data needs to be deleted, after expiration date judgment according to the time or according to the size, but the data from a large file delete time-consuming and error-prone, therefore adopted a more scientific method, The data in each partition is stored in multiple segment files. The segment is used as the basic unit of data expiration. If a segment file expires, delete it directly.

The segment file has two states: A segment file is closed when it has reached a certain size or hasn’t written data in a while. Bytes and log.segment.ms are used to control the size of the segment file. By default, the size is set to 1G.

Note: The broker holds a file handle for each segment file, even if it is closed, so the operating system’s file handle parameters need to be carefully tuned.

4. File format

Format specification

In each segment file, messages and offsets are stored in the same format as messages that producers produce to and consumers consume from the broker. This ensures that the broker can use zero-copy technology when sending messages to consumers. It also avoids data compression and decompression (because the producer sends the compressed message).

In addition to the key, value, and offset of the message, each message also contains the message size, checksum code, message format version, compression format, and timestamp. The timestamp can be the timestamp sent by the producer or received by the broker. By the message. The timestamp. The type parameter, configurable value as CreateTime and LogAppendTime.

When a production sends a compressed message, the compressed multiple messages are treated as one message, and inside the message, the original information of each message is contained. The specific format is shown in the figure below:

Field validation

Create a topic named iamabug, write a few messages to it, and then check the contents of the segment.

As you can see, it’s basically the same as above, but with more fields.

5. Index files

Because Kafka allows consumers to consume from any specified offset, the broker needs to be able to quickly locate messages by indexing each segment file. The segment file suffix is.log, and the index file suffix is.index. They both have the same prefix, which is the offset of the first message in the segment file. Inside the index file, the topic offset and the byte offset of the actual data in the segment file are recorded.

When you need to locate a specified offset, compare the prefix of the.index file to find the appropriate.index file. Then check the contents of the.index file to find the specified offset and the corresponding file offset. Based on this offset directly in the.log file seek to the corresponding location, read data can be. This is a simplification. Not every offset is actually found, because Kafka actually builds a sparse index. Sometimes you need to find the closest offset and find it message by message in a.log file.

There is no check code in the index file. When data is found to be abnormal or corrupted, the broker directly regenerates the index file.

Note: I tried to delete an index file directly under a topic partition, but the result was not generated immediately. Attempts to produce and consume the topic were not generated either. Restarting the broker did regenerate the index file.

Welcome to exchange and discuss, make fun of suggestions, share collection.

Study like the seedlings of spring, see its increase, there is a director
Drop out of school such as a whetstone, see its loss, loss
Pay attention to [big data apprentice], with technical dry goods to help you have a strong point