1. Motivation 2. Persistence 3. Efficiency 4. Producer 4.1 Load balancing 4.2 Asynchronous sending 5. Consumer Push vs. Pull Consumer location offline data loading

1. Motivation

Kafka is designed to act as a unified platform for handling all the real-time data streams that a large company might need. To do this, we must consider a fairly wide range of use cases.

  • It must have high throughput to support high-volume event flows, such as real-time log aggregation;

  • It must gracefully handle big data backlogs and support regular data loading from offline systems;

  • The system must be able to publish messages with low latency;

  • We wanted it to be able to partition, distribute, and process streams in real time (which inspired our partitioning and consumer models);

  • Finally, in the case of streams being sent to other data systems for service, the system must be fault-tolerant in the event of machine failure.

Supporting these uses allows us to design with many unique elements rather than a traditional messaging system. We’ll outline some of the elements of Kafka’s design in the following sections.

2. The persistent

Don’t be afraid of file systems!

Kafka relies heavily on file systems to store and cache messages. The common perception is that “disk speed is slow,” which has led to doubts about the persistence structure’s ability to provide competitive performance. In fact, disks can be slower or faster than people expect, depending on how they are used. Reasonable disk structure design can make it as fast as the network.

A key factor in disk performance is the throughput of hard disks over the past decade. The results show that the JBOD configuration of linear writing to a RAID-5 array consisting of six 7200rpm SATA modules can reach 600MB/ s. However, the performance of random write is only 100K/ SEC. Performance difference of 6000 times.

Linear reads and writes are for the most part predictable and optimized for the operating system. Modern operating systems provide read-ahead and write-behind techniques, where block multiples prefetch data and merge very small logical writes into one large physical write. A more in-depth discussion of the question reference ACM Queue article:https://queue.acm.org/detail.cfm?id=1563874, conclusion is: the order write disk speed in some cases faster than the speed of random write memory.

To compensate for performance differences, modern operating systems increasingly aggressively use main memory for disk caching. Now the operating system is happy to use all available memory for disk caching, and when memory is reclaimed, there is little performance penalty, and all disk reads and writes go through the same memory. This feature cannot be easily turned off without direct I/O. Therefore, even if a process maintains an in-process data cache, the data may still be copied to the operating system’s Pagecache, and all memory is stored twice.

In addition, Kafka runs on a JVM, and anyone who knows anything about Java memory knows two things:

  • Objects have a very high memory overhead, often doubling (or worse) the size of the data stored.

  • As the data in the heap increases, Java garbage collection becomes more frequent and slower.

Because of these factors, using a file system and relying on Pagecache is better than maintaining a memory cache or other structure-we need to at least double the available memory by automatically accessing all available memory. And it may be necessary to double the available memory again because you want to store compressed byte structures rather than detached objects.

Doing so will result in 28 to 30 gigabytes of cache on a 32 GIGAByte server. In addition, the cache remains hot even if the business process is restarted. But the in-process cache will need to be rebuilt in-process (10GB of data takes about 10 minutes), or it will start as a completely cold cache (which means very poor initial performance).

This greatly simplifies the code because all the logic to maintain cache and file system consistency is all in the operating system. If you are using disk in favor of linear reads, then preread will very efficiently prepopulate each disk read into the cache.

This suggests a very simple design: don’t panic about maintaining as much data in memory as possible and flushing it all to the file system. Instead, all data should be written to the persistent log of the file system immediately, without being flushed to disk. In practice, this simply means that it is transferred to pagecache in the operating system kernel.

This Pagecache-centric design style is described in a design article about Varnish. Can poke link http://varnish-cache.org/wiki/ArchitectNotes for more.

Varnish is a high-performance open source HTTP accelerator used by Taobao and Pduo Duo to handle heavy traffic.

Satisfy constant time

The persistent data structures used in messaging systems are usually associated with a BTree or other generic random-access data structure for each consumer queue to maintain message metadata.

BTree is the most versatile data structure and supports transactional and non-transactional semantics in a wide range of messaging systems. The BTree operation complexity is O(log N) and is considered essentially equivalent to constant time.

This is not the case with disks, which take about 10ms per operation and can only do one address per disk at a time, so parallelization is very limited. Therefore, even a very small amount of disk searching can result in very high overhead.

Because storage systems mix very cache operations with very inefficient disk operations, the performance of data structures with a fixed cache is usually superlinear as data grows. That is, doubling the data degrades performance by half.

Intuitively, you can build persistent queues on simple reads and attach them to files, as is the case with logging solutions. This structure has the advantage of O(1) time complexity for all operations. Reading does not block writing, and writing does not block reading. This has the advantage of a complete separation of performance from data size, with no performance degradation as the volume of data increases. Servers can take advantage of lots of cheap, low-speed hard drives. Despite their poor addressing performance, disks perform acceptably on a large number of reads and writes, getting three times as much power for a third of the cost.

Access to almost unlimited disk space without any performance penalty means that we can provide functionality that messaging systems typically do not. For example, in Kafka, instead of trying to delete a message after it has been consumed, we can keep the message for a long time (for example, a week), as we will describe, giving consumers great flexibility.

The efficiency of 3.

We try to be as efficient as we can, and one of our primary user scenarios is dealing with web activity data, which is a huge amount of data. More than a dozen messages may be generated per page view. In addition, we assume that every delivered message will have at least one messenger (and often multiple), so we strive to make consumers as efficient as possible.

In the previous section we discussed disk efficiency, poor disk access patterns. There are two main reasons why such systems are inefficient: lots of small I/O operations, and too many byte copies. Small I/O problems occur in persistence operations on both the client and server sides, as well as on the server itself.

To avoid this problem, our protocol is built around the “message set” abstraction, which naturally groups messages together. This allows network requests to group messages together, spreading the cost of going back and forth over the network, rather than sending a separate message at a time. The server in turn appends message blocks to its log. The consumer then grabs the large linear message block each time.

This simple optimization resulted in an incredible order of magnitude speed increase. Batching leads to larger network packages, larger sequential disk operations, and so on. All of this will allow Kafka to convert a burst of random message writes to the stream into linear writes to the consumer.

Another inefficiency is byte copying. This is not a problem at low message rates, but the impact can be significant under load. To avoid this problem, we developed a standard binary message format that is shared between producers, brokers, and consumers, so that blocks of data are transferred between them without modification.

The message log is maintained by the broker and is itself just a directory of files. Each file is populated with a set of messages that have been written to disk in the same format used by producers and consumers. Maintaining this common format allows optimization of the most important operation: persistent network transfer of log blocks. Modern Unix operating systems provide a highly optimized code path for transferring data from Pagecache to sockets. On Linux systems, this is done through the SendFile system call.

To understand the impact of SendFile, it is important to understand the common data transfer path from file to socket:

  1. The operating system reads data from the disk into the pagecache of the kernel space.

  2. The application reads data from kernel space into the user-space buffer;

  3. The application writes data back to the socket buffer in kernel space;

  4. The operating system copies data from the socket buffer to the NIC buffer and sends it across the network.

Obviously, this is not efficient. In total, there are 4 copies and 2 system calls. Using SendFile, however, allows the operating system to send data directly from Pagecache to the network, thereby avoiding re-copying. So in this optimized path, only the last copy to the NIC buffer is required.

We expect a common use case, one topic with multiple consumers. With zero-copy optimization, data is copied to Pagecache only once and can be reused with each consumption. So you don’t have to store it in memory and copy it to user space every time you read it. This allows messages to be consumed at rates close to the network connection limit.

Zero-copy before and after optimization:

                 

The combination of Pagecache and SendFile means that the Kafka cluster will not see any upstream read activity from the disk while the consumer is constantly chasing because they will supply data entirely from the cache.

More support in the Java sendfile and zero – copy (zero copy), please refer to the article: https://www.ibm.com/developerworks/linux/library/j-zerocopy/

End-to-end bulk compression

In some scenarios, the bottleneck is not actually CPU or disk, but network bandwidth. In particular, a data pipeline is needed to send messages between two data centers under a wan. Of course, users can compress their messages every time, without the need for Kafka support. But this leads to very poor compression, because most of the redundancy is due to duplication between messages of the same type (for example, field names in JSON).

Efficient compression requires compressing multiple messages together rather than each message individually.

Kafka supports this functionality in an efficient batch format. A batch of messages is bundled together and compressed, and then sent to the Kafka server in this compressed format. Batch messages are stored in the log in a compressed format and can only be decompressed by consumers.

Kafka supports GZIP, Snappy, and LZ4 compression protocols.

4. Producers

4.1 Load Balancing

The consumer sends data directly to the leader of the partition on the broker, without requiring any intermediate routing layer. To enable producers to do this, all Kafka nodes can answer a metadata request about which brokers are alive and where the leader of the topic partition is located.

The client controls which partition to publish messages to. It can be random (a random load balancing implementation) or it can be partition-specific. Kafka exposes an interface that allows the user to specify the key of a message to specify a specific partition (Kafka hashes the key and modulates the number of partitions to get the destination partition). For example, if key is a userId, all message data for that user is sent to the same partition.

This will allow consumers to make location assumptions about their consumption. This partitioning approach is explicitly designed to allow location-sensitive processing among consumers.

4.2 Asynchronous Sending

Batch processing is one of the important drivers of efficiency. Kafka producers try to accumulate data in a cache and then send larger batches per request. Batch processing can be configured in two ways: no more than a certain number of messages, such as 16K. It is not allowed to wait beyond the time delay limit, such as 100ms. This configurable buffering approach provides a mechanism to trade off small amounts of additional latency for better throughput (a trade-off between low latency and high throughput).

5. Consumers

A Kafka consumer sends a fetch request to the broker to navigate to the location it wants to consume. The consumer specifies the log offset in each request, and then receives a chunk of the log starting at that location. The consumer can control this location and, if necessary, it can go back and re-consume already consumed logs.

Push vs. Pull

One of the initial questions we consider is whether the consumer should pull data from the broker or whether the broker should push data to the consumer. In this respect, Kafka follows a more traditional design, where producers push data to the broker, and consumers pull data from the broker.

Some log-centric systems, such as Scribe and Apache Flume, follow a completely different push-based model where data is pushed downstream. Both approaches have pros and cons.

Push-based systems, however, are more difficult to handle because the broker controls the rate of data transfer. Our general goal is for consumers to consume at the highest possible rate, and unfortunately, in a Push system, this means that consumers are overwhelmed when they can’t consume as fast as they can produce.

Pull-based systems have better features, and consumers can fall behind and possibly catch up. It can be mitigated by some kind of retreat agreement. By this agreement, the consumer can show that it is overloaded. But the transfer rate will only be used up, not overused. Previous attempts to build systems this way led us to the more traditional Pull model.

Another advantage of pull-based systems is that the system can send data to consumers in bulk as much as possible. A push-based system must choose to send requests immediately and without sensing whether downstream consumers can process them right away.

If low-latency tuning is preferred, this results in only one message being sent at a time, which can be very wasteful. A pull-based design does not have this problem, because the consumer always pulls all valid messages after the current position in the log (or may receive a configured maximum Pull size limit, such as 1K), and thus can get the best batch without introducing unnecessary delay.

The disadvantage of a pull-based system is that if the broker has no data, the consumer may terminate polling in the loop, while the consumer is busy waiting for the data to arrive. To avoid this problem, Kafka’s poll request has parameters that allow consumers to block in a long poll while requesting, waiting until data arrives in the broker (and optionally supports waiting until a given number of bytes are available, ensuring a large transfer data size).

You can imagine other possible designs, just pull, end to end, the producer will write to the log locally, the broker will pull data from here, and then the consumer will pull data from the broker. A producer of a store-and-forward type. This was a very interesting design, but we decided it was not suitable for our usage scenario with thousands of producers.

Our experience running persistent data systems on a large scale makes us feel that in many applications involving thousands of disks on the system does not make things more reliable, but can be a maintenance nightmare. In fact, we found that we could run pipelines with strong SLAs on a large scale without producer persistence.

Consumer location

Surprisingly, keeping track of the data already consumed is one of the key performance points of a messaging system. Many messaging systems keep metadata on brokers about what messages have been consumed. When a message reaches a consumer, the broker either logs it immediately or waits for the consumer’s confirmation. In practice, it is not really clear where this state might go for a single server, and since the data structure used to store this information is small in a messaging system, it is also a pragmatic choice that —- Broker knows what data is being consumed and can immediately delete it, keeping the data volume small.

It may not be obvious that getting brokers and consumers to agree on what they consume is not a trivial problem. If the broker records the message as consumed each time it is distributed over the network, the message may be lost if the consumer fails to process the message (for reasons such as consumer crash or request timeout).

To solve this problem, the message system needs to add confirmation mechanism, such as ActiveMQ. This means that when a message is sent, it is only marked as sent and unconsumed. The broker waits for confirmation that the message is marked as consumed by the consumer. This mechanism fixes the problem of message loss, but introduces new problems. First, after the consumer processes the message but makes an error before sending an acknowledgement, the message will be consumed twice. The second problem relates to performance. The broker must maintain multiple states for each message. So you have to deal with thorny issues, such as how to handle messages that are sent but never acknowledged.

Kafka approaches this problem in a completely different way. Our topics are separated into ordered partitioned collections. At any given time, each partition is consumed by only one consumer from the group of consumers subscribing to the topic. This means that the location of the consumer in each partition is just an integer, the offset of the message to be consumed next time. This makes the consumed state very small, with only one number per partition, and can be checked periodically. This makes the equivalent of message validation very low-cost.

This decision has the added benefit that consumers can deliberately fall back to an old offset and start spending again. This violates the universal nature of queues, but is an essential feature for many consumers. For example, if a BUG is found in the consumer’s code after the message is consumed, the consumer can re-consume the message as soon as the BUG is consumed by the consumer.

Offline data loading

For example, batch data loading, which periodically loads data into an offline system such as Hadoop or a relational data warehouse.

In the case of Hadoop, we parallelize the data load by allocating the load to various mapping tasks, one per node/topic/partition combination, allowing full parallelism in the load. Hadoop provides task management, and failed tasks can be restarted without risk of repeated data – they are simply restarted from their original location.


If you feel that you have gained something after reading it, please click “like”, “follow” and add the official account “Ingenuity Zero” to check out more wonderful history!!