Original address:www.wmyskxz.com/2019/07/17/…Author: I don’t have three hearts.

1. Kafka is stored on a file system

Yes, the first thing you should know is that Kafka messages exist on file systems. Kafka relies heavily on file systems to store and cache messages.

Modern operating systems have made some optimizations for disk read and write to speed up disk access. For example, preread will quickly read a large disk into memory in advance. Postwrites combine many small logical writes into one large physical write. In addition, the operating system uses any free memory space left over from main memory as a disk cache, and all disk reads and writes go through a uniform disk cache (except for direct I/O that bypasses the disk cache). Combined with these optimization features, sequential access to disk can in some cases be faster than random memory access, or even close to network speed.

The above Topic is actually a logical concept. Consumers and producers are physically stored as partitions, and each Partition ultimately corresponds to a directory where all messages and index files are stored. By default, only one Partition is created for each Topic when the number of partitions is not specified. For example, if I create a Topic named test and do not specify the number of partitions, I will create a test-0 folder by default, where the naming rules are: <topic_name>-<partition_id>.

Any messages published to a Partition are appended to the end of the Partition data file. This sequential write to disk makes Kafka very efficient.

Each message sent to the Broker selects which Partition to store according to Partition rules. If the Partition rules are set properly, all messages can be evenly distributed among different partitions.

2. Underlying storage design in Kafka

If we create two topic1 and topic2 topic1 and topic2 partitions, we will create three folders in our root directory:

    | --topic1-0
    | --topic2-0
    | --topic2-1
Copy the code

In Kafka, there are multiple partitions under the same Topic. Each Partition is a directory, and each directory is evenly divided into multiple Segment files of equal size. The Segment File consists of index File and data File, which are always paired. The suffix “.index “and”.log “subtables denote the Segment index File and data File.

Now suppose we set each Segment size to 500 MB and start the producer to write a lot of data to topic1. The topic1-0 folder will produce some files like the following:

   | --topic1-0 
       | --00000000000000000000.index 
       | --00000000000000000000.log 
       | --00000000000000368769.index 
       | --00000000000000368769.log 
       | --00000000000000737337.index 
       | --00000000000000737337.log 
       | --00000000000001105814.index | --00000000000001105814.log 
   | --topic2-0 
   | --topic2-1
Copy the code

Segment is the smallest unit of Kafka file storage. The name of each subsequent Segment is the offset value of the last message in the preceding Segment. Values are up to 64 bits long, 19 digits long, and no digits are padded with zeros. Such as 00000000000000368769. The index and 00000000000000368769. The log.

Use the Segment File above as an example to illustrate the mapping between index files and data files.

For example, the metadata < 3, 497 > in the index file represents the third message in the data file (368769 + 3 = 368772 message in the global Partition) and the physical offset address of the message is 497.

Note that the index file does not start at 0 and does not increment by 1 each time. This is because Kafka uses sparse index storage, which creates an index every certain byte of data. This reduces the index file size and allows indexes to be mapped to memory, reducing disk I/O overhead during queries. At the same time, it does not bring much time consumption to the query.

The file name is the offset of the last message in the previous Segment, so when we need to find a message with the offset specified, we can find the Segment to which it belongs by binary search in the filename of all segments. Find its physical location on the file in its index file and retrieve the message.

The sequential disk I/O storage design is important for Kafka’s high performance because messages are read and written sequentially in Partition Segment data files and are not deleted after consumption (deletion policy is for expired Segment files).

How does Kafka know exactly what message offset is? This is because Kafka defines a standard data storage structure, where each message in a Partition contains three properties:

  • Offset: indicates the offset of Message in the current Partition. It is a logical value that uniquely identifies a message in the Partition and can be simply considered as an ID.
  • MessageSize: indicates the size of the message content data.
  • Data: Specifies the content of the message

3. Manufacturer design brief

Different services require different write modes and configurations. We won’t discuss the details here, but let’s take a look at the basic process for producers to write messages:

The process is as follows:

  1. First, we need to create a ProducerRecord. This object needs to contain the topic and value of the message, optionally specifying a key or partition.
  2. When sending a message, the producer serializes the key and value into a byte array and sends it to the partitioner.
  3. If we specify a partition, then the allocator returns that partition; Otherwise, the allocator will select a partition based on the key value and return it.
  4. Once the partition is selected, the producer knows which topic and partition the message belongs to, and it adds this record to the batch messages of the same topic and partition, which another thread is responsible for sending to the corresponding Kafka Broker.
  5. When the broker receives a message, it returns a RecordMetadata object containing the message’s subject, partition, and displacement if it is successfully written, otherwise an exception is returned.
  6. Once the producer receives the result, it may retry the exception.

4. Consumer design overview

1. Consumers and consumer groups

A Kafka consumer is part of a consumer group, and when multiple consumers form a consumer group to consume a topic, each consumer receives a message from a different partition. Suppose you have a T1 topic that has four partitions; We also have a consumer group, G1, which has only one consumer, C1. Then consumer C1 will receive a message for the four partitions as follows:

If increased to four consumers, then each consumer will receive a message for one partition, as shown below:

But if we continue to add consumers to this consumer group, the remaining consumers will be idle and will not receive any messages:

In a word, we can expand the level and improve the consumption capacity by increasing the number of consumers in the consumer group. This is why it is recommended to create topics with a large number of partitions to increase the number of consumers to improve performance under high consumption loads. In addition, the number of consumers should not be more than the number of districts, because the extra consumers are idle and not helpful.

An important feature of Kafka is that a message can be written once, allowing any number of applications to read the message. In other words, each application can read the full amount of messages. In order for each application to be able to read the full amount of messages, the application needs to have different consumer groups. For the example above, if we add a new consumer group, G2, that has two consumers, it looks like this:

In this scenario, consumer group G1 and consumer group G2 both receive full messages for T1 topics and logically belong to different applications.

In the end, it boils down to this: If the application needs to read full messages, set a consumer group for the application; If the app doesn’t have enough spending power, consider adding customers to that group.

2. Rebalance consumer groups and zones

As you can see, when a new consumer joins a consumer group, it consumes one or more partitions that were previously the responsibility of other consumers. In addition, when a consumer leaves a consumer group (such as a reboot, outage, etc.), the partitions it consumes are allocated to other partitions. This phenomenon is called rebalancing. Rebalancing is an important feature of Kafka, which ensures high availability and horizontal scaling. However, it is important to note that during rebalancing, no consumer can consume messages, thus making the entire consumer group temporarily unavailable.

A consumer keeps alive within a consumer group by periodically sending a hearbeat to a broker that acts as a group coordinator. The broker is not fixed, and each consumer group may be different. When a consumer pulls a message or submits, a heartbeat is sent.

If the consumer does not send a heartbeat for more than a certain amount of time, its session expires, the group coordinator assumes that the consumer is down and triggers a rebalance. As you can see, there is a certain amount of time between the time the consumer is down and the time the session expires. During this time, the consumer’s partition cannot consume messages. In general, we can do a graceful shutdown so that the consumer sends an away message to the group coordinator, so that the group coordinator can rebalance immediately without waiting for the session to expire.

In version 0.10.1, Kafka changed the heartbeat mechanism to separate the sent heartbeat from the pull message, so that the frequency of the sent heartbeat is not affected by the pull frequency. In addition, higher versions of Kafka allow you to configure how long a consumer does not pull a message but still stays alive, which avoids livelock. A live lock is an application that is not malfunctioning but cannot be consumed further for some reason.

3. Partition and consumption model

In Kafka, messages in a topic are broken up and distributed across multiple partitions. The Consumer Group needs to fetch messages from different partitions. How do we reconstruct the order of messages in a Topic?

The answer is: no. Kafka only ensures that messages are ordered within a Partition, regardless of the global situation.

The next question is: When are consumed messages deleted from a Partition that can be consumed multiple times by different Consumer groups? How does a Partition know where a Consumer Group is currently consuming?

Regardless of whether a message is consumed, a Partition never deletes a message unless the message expires. For example, if the retention period is set to two days, any Group can consume a message within two days after the message is published. After two days, the message is automatically deleted. The Partition stores an offset for each Consumer Group that records the location of the Group’s consumption.

4. Why is Kafka a pull model

Should the consumer ask the Broker for data (pull) or should the Broker push data (push) to the consumer? As a messaging system, Kafka follows the traditional pattern of having producers push messages to the broker and consumers pull messages from the broker.

The push pattern is difficult to accommodate consumers with different consumption rates, because the message sending rate is determined by the broker. The goal of the push mode is to deliver messages as quickly as possible, but this can easily cause consumers to fail to process messages, typically through denial of service and network congestion. The Pull pattern consumes messages at an appropriate rate based on the Consumer’s ability to consume.

For Kafka, the pull mode is more appropriate. The Pull pattern simplifies broker design and allows consumers to control the rate at which messages are consumed, as well as the way they consume them — either in bulk or on a piece-by-piece basis — and to choose different delivery methods to implement different transport semantics.

5. Kafka high-performance throughput

1. Broker

Unlike in-memory Message queues such as Redis and MemcacheQ, Kafka is designed to write all messages to a low-speed, high-capacity hard disk in exchange for more storage power.

Because Kafka only does Sequence I/O on disk, this is not a problem due to the nature of message system reads and writes. Therefore, you can only perform Sequence I/O to avoid the impact of low disk access speed on performance.

First, Kafka relies heavily on the PageCache functionality provided by the underlying operating system. When upper-layer writes occur, the operating system simply writes data to PageCache and marks the Page attribute as Dirty. When a read operation occurs, the disk is searched in the PageCache first. If a page is missing, the disk is scheduled and the required data is returned.

PageCache actually uses as much free memory as possible as a disk cache. At the same time, if other processes apply for memory, the cost of recycling PageCache is very small, so modern OS support PageCache.

PageCache is just the first step. Kafka uses Sendfile for further performance optimization. Before explaining Sendfile, let’s take a look at the traditional network I/O process, which is generally divided into four steps:

  1. The OS reads data from the hard disk to the PageCache in the kernel.
  2. The user process copies data from the kernel to the user.
  3. The user process then writes data to the Socket, which flows into the kernel’s Socket Buffer.
  4. The OS then copies the data from the Buffer to the network card’s Buffer to complete a transmission.

The whole process goes through two Context switches and four System calls. The same data is duplicated between the kernel Buffer and user Buffer, which is inefficient. Step 2 and step 3 are not necessary. You can copy data directly in the kernel area. This is exactly the problem Sendfile solved. After Sendfile optimization, the entire I/O process looks like this.

As you can see, Kafka is designed to do whatever it takes to exchange data in memory, either externally as a whole messaging system or internally with the underlying operating system. Zero I/O data exchange can be achieved if the production and consumption schedules between producers and consumers are properly coordinated. This is why I say that Kafka’s use of “hard drives” does not suffer too much performance penalty.

2. Partition

Partitions are the basis for Kafka to scale well and provide high concurrency and Replication.

Extensibility

  • First, Kafka allows partitions to move freely between brokers in a cluster to balance out possible data skews. Second, partitions support custom partitioning algorithms. For example, all messages with the same Key can be routed to the same Partition.
  • Meanwhile, the Leader can migrate to the Replica In in-sync. All read/write requests to a Partition are processed only by the Leader. Therefore, Kafka tries to evenly distribute the Leader to all nodes in the cluster to avoid excessive network traffic.

Concurrent aspects

  • Any Partition can be consumed by only one Consumer in a Consumer Group at a time, and conversely, a Consumer can consume multiple partitions simultaneously. Kafka’s very compact Offset mechanism minimizes the interaction between Broker and Consumer, so that Kafka does not degrade as the number of downstream consumers increases, as other message queues in the same class do.
  • In addition, if multiple consumers happen to consume data in a very similar time order, the PageCache hit ratio can be very high. Therefore, Kafka can support high concurrent read operations very efficiently. In practice, it can basically reach the maximum of single card.
  • However, more partitions is not always better. The more partitions there are, the more partitions there are on average on each Broker. In the case of Broker failure, the Controller needs to re-elect the Leader for all partitions on all Broker failures. Assume that the election for each Partition consumes 10ms. If there are 500 partitions on the Broker, The 5 s in the election of the time, read and write operations to the Partition can trigger LeaderNotAvailableException.
  • Further, if the failed Broker is the Controller of the entire cluster, the first thing to do is to reappoint a Broker as Controller. The newly appointed Controller needs to obtain Meta information of all partitions from Zookeeper. It takes about 3-5ms to obtain Meta information of each Partition. If there are 10,000 partitions, the time will reach 30-50s. And don’t forget that this is just the time it takes to restart a Controller, plus the time it takes to elect the Leader mentioned above.
  • In addition, on the Broker side, buffers are used for both producers and consumers. The size of buffers is uniformly configured, and the number of buffers is the same as that of partitions. If there are too many partitions, the Buffer used by producers and consumers occupies too much memory.

3. Producer

In fact, in the optimization of the Producer end, most message systems adopt a single way, which is nothing more than the integration of parts and synchronous mutation steps.

By default, Kafka supports MessageSet, which automatically groups multiple messages into a Group and then sends them out. This reduces the RTT for each communication. Moreover, when MessageSet is organized, the data can be reordered from random write of burst stream to smooth linear write.

In addition, Producer supports end-to-end compression. The data is compressed locally and transported over the network. The Broker generally does not decompress (unless deep-iteration is specified) until the message is consumed and decompressed at the client.

There is also the option of doing your own compression and decompression on the application layer (Kafka currently supports only GZIP and Snappy compression algorithms), but this can be surprisingly inefficient. Kafka’s end-to-end compression works best with MessageSet. The above approach directly breaks the link between the two. As for the truth is actually very simple, a basic principle of compression algorithm “the more repeated data, the higher the compression ratio”. Nothing about the content of the message body, nothing about the number of the message body, in most cases a larger amount of input data will achieve a better compression ratio.

However, Kafka’s adoption of MessageSet also leads to some compromises in usability. Every time the Producer sends data, it sends () and then thinks it has been sent. However, in most cases, the message is still in the MessageSet in memory and has not yet been sent to the network. In this case, if the Producer dies, data will be lost.

To address this problem, Kafka in version 0.8 is designed to borrow ack mechanisms from networks. Acks =0 is set to request. Required. Acks =0 to turn off ack and send at full speed if performance is high and Message loss is allowed to some extent.

If you need to acknowledge sent messages, you need to set request.required. Acks to 1 or -1

  • Here we also mention the problem of Replica number discussed earlier. If the value is set to 1, the message only needs to be received and acknowledged by the Leader. Other replicas can pull asynchronously without immediate confirmation. This ensures reliability without reducing efficiency.
  • If the value is set to -1, it means that the message can only be returned after being committed to all replicas in the ISR set of the Partition. The ack will be more secure, and the delay of the whole process will increase in proportion to the number of replicas. Therefore, corresponding optimization needs to be made according to different requirements.

4. Consumer

With the Consumer Group, producer Consumer and queue access modes are supported.

There are two types of Consumer apis: High level and Low level. The former one relies heavily on Zookeeper, so it has poor performance and is not free, but it is extremely easy to worry about. The second one does not rely on Zookeeper service and has better performance in terms of freedom and performance, but all exceptions (Leader migration, Offset out of bounds, Broker downtime, etc.) and Offset maintenance need to be handled by ourselves.

The resources

  • Kafka design analysis (a) : Kafka background and architecture introduction
  • Kafka series (1) First acquaintance Kafka
  • Introduction to Kafka
  • Why are topics in Kafka partitioned? – zhihu
  • Kafka design and practice thinking
  • Kafka series (vi) Reliable data transmission

The last

You can follow my wechat public number to learn and progress together.