Kafka’s status on the Meituan data platform

Kafka’s excellent I/O optimization and multi-asynchronization design, compared with other message queue systems with higher throughput and good latency, is very suitable for application in the whole big data ecosystem.

Kafka currently plays the role of data buffering and distribution in the Meituan data platform. As shown in the figure below, business logs, access layer Nginx logs or online DB data are sent to Kafka through the data acquisition layer. Subsequent data are consumed and calculated by the real-time operation of the user, or used for data warehouse production by the ODS layer through the data warehouse. Some of the data will enter the unified log center of the company to help engineers troubleshoot online problems.

Current Meituan online Kafka scale:

  • Cluster size: 6000+ nodes, 100+ clusters.
  • Cluster bearing: Topic number 60,000 +, Partition number 410,000 +.
  • Size of messages processed: Currently 8 trillion messages are processed per day, with peak traffic of 180 million messages per second
  • Scale of services provided: Currently, the downstream real-time computing platform runs over 30,000 jobs, most of which are sourced from Kafka.

Kafka Online Pain Point Analysis & Core Target

Currently, Kafka supports a large number of real-time jobs, and a large number of topics and partitions are hosted on a single machine. A common problem in this scenario is that different partitions on the same machine compete for PageCache resources and affect each other, leading to higher processing latency and lower throughput of the whole Broker.

Next, we will analyze Kafka’s pain points online by combining the process of Kafka’s read and write requests with online statistics.

The principle of analysis

For the Produce request, the I/O thread on the Server side writes the data in the request to the PageCache of the operating system and returns it immediately. When the number of messages reaches a certain threshold, the Kafka application itself or the kernel of the operating system will trigger the forced brush operation (as shown in the left flowchart).

For the Consume request: the ZeroCopy mechanism of the operating system is mainly utilized. When the Kafka Broker receives the read data request, it will send a sendfile system call to the operating system. After receiving the request, the operating system first tries to get the data from the PageCache (as shown in the middle flow chart). If the data is not present, a page short exception interrupt is triggered to read the data from disk into a temporary buffer (as shown in the flowchart on the right), and then the data is copied directly into the card buffer through a DMA operation to await subsequent TCP transfers.

In summary, Kafka has good throughput and latency for a single read and write request. When processing the write request, the data is written to PageCache immediately return, the data through the asynchronous way batch brush disk, not only to ensure that most write requests can have a lower delay, at the same time batch sequential brush disk is more friendly to the disk. When processing read requests, real-time consumption jobs can directly read data from PageCache, with small request delay. Meanwhile, ZeroCopy mechanism can reduce the switch between user mode and kernel mode in the process of data transmission, which greatly improves the efficiency of data transmission.

However, when multiple consumers exist on the same Broker, it is possible for them to delay at the same time as multiple consumers compete for PageCache resources. Let’s take two Consumer examples to illustrate:

As shown in the figure above, the Producer sends data to the Broker, which is cached by PageCache. When all consumers have sufficient consumption power, all data is read from PageCache, and all Consumer instances have low latency. At this point, if one of the consumers has a consumption delay (Consumer Process2 in the figure), according to the process of read request, a disk read will be triggered at this time, and part of the data will be pre-read to the PageCache while the data is read from the disk. When PageCache runs out of space, the data will be phased out according to the LRU policy. At this time, the data read by the delayed Consumer will replace the real-time cached data in PageCache. Subsequent real-time consumption requests arrive, resulting in unexpected disk reads because the data in the PageCache has been replaced. This leads to two consequences:

  1. Consumers with sufficient consumption power will lose the performance dividend of PageCache.
  2. Multiple consumers interact with each other, increasing expected disk reads and HDD load.

We conducted gradient tests on the performance of HDD and the impact of read/write concurrency, as shown in the figure below:

As you can see, as read concurrency increases, the IOPS and bandwidth of the HDD will decrease significantly, further affecting the overall Broker throughput and processing latency.

Online Statistics

Currently Kafka cluster TP99 traffic at 170MB/s, TP95 traffic at 100MB/s, TP50 traffic at 50-60MB/s; The average distribution of PageCache on a single machine is 80GB, and TP99 traffic is taken as a reference. In the case of this traffic and PageCache allocation, the maximum cacheable PageCache data time span is 80*1024/170/60 = 8min. It can be seen that the overall Kafka service has a very low tolerance for delayed consumption. In this case, the real-time consumption job may be affected once some jobs are consumed late.

At the same time, we counted the consumption delay distribution of online real-time jobs, and the jobs with the delay range of 0-8min (real-time consumption) only accounted for 80%, indicating that 20% of online jobs were in the state of delayed consumption at present.

Pain point analysis and summary

Summarizing the above principle analysis and online data statistics, at present, online Kafka has the following problems:

  1. Real-time consumption and delayed consumption jobs compete at the PageCache level, causing real-time consumption to produce unexpected disk reads.
  2. The performance of traditional HDDs deteriorates dramatically as read concurrency increases.
  3. There are 20% delayed consumption jobs online.

According to the current PageCache space allocation and the analysis of online cluster traffic, Kafka cannot provide stable service quality guarantee for real-time consumption operations, and this pain point needs to be solved urgently.

target

According to the above analysis of pain points, our expected goal is to ensure that real-time consumption jobs will not be affected by delayed consumption jobs due to PageCache competition, and to ensure that Kafka provides stable service quality guarantee for real-time consumption jobs.

The solution

Why SSD

According to the analysis of the above reasons, the solution to the current pain points can be considered from the following two directions:

  1. Eliminate the PageCache competition between real-time consumption and delayed consumption, such as: let the data read by delayed consumption job not write back to PageCache, or increase the allocation of PageCache, etc.
  2. Add a new device between the HDD and memory that has better read/write bandwidth and IOPS than an HDD.

For the first direction, because PageCache by the operating system management, if modify its elimination strategy, then the implementation of the difficulty is more complex, at the same time will destroy the kernel itself external semantics. In addition, memory resources are expensive and cannot be expanded indefinitely, so you need to consider the second direction.

The development of SSD is becoming increasingly mature. Compared with HDD, the IOPS and bandwidth of SSD have an order of magnitude improvement, which is very suitable to undertake part of the read traffic after the competition of PageCache in the above scenario. We also tested the performance of SSD, and the results are shown in the figure below:

As can be seen from the figure, with the increase of read concurrency, the IOPS and bandwidth of SSD do not decrease significantly. From this conclusion, we can use SSD as the cache layer between PageCache and HDD.

Architectural decisions

After SSD is introduced as the cache layer, the key problems to be solved in the next step include data synchronization among PageCache, SSD and HDD as well as data routing of read and write requests. Meanwhile, our new cache architecture needs to fully match the characteristics of read and write requests of Kafka engine. This section describes how the new architecture addresses the issues mentioned above in the selection and design.

The Kafka engine has the following features in read and write behavior:

  • The frequency of data consumption varies with time, and the longer the data is, the lower the frequency of data consumption.
  • Only the Leader of each Partition provides read and write services.
  • For a client, consumption behavior is linear and data is not duplicated.

The following two alternatives are presented, and our selection basis and architectural decision for these two alternatives will be given below.

Alternative one: based on the operating system kernel layer implementation

At present, open source caching technologies include FlashCache, BCache, DM-Cache, OpenCas, etc., among which BCache and DM-Cache have been integrated into Linux, but there are requirements for kernel version. Due to the limited kernel version, we can only choose FlashCache/ OpenCas.

As shown in the figure below, FlashCache and OpenCas have similar core design ideas. The core theory of the two architectures is based on the principle of “data locality”. SSDs and HDDs are separated into fixed management units according to the same granularity, and then the space on SSD is mapped to devices of multiple HDDs (logical mapping or physical mapping). In terms of access flow, similar to the flow of CPU accessing Cache and main memory, the first attempt is to access the Cache layer. If Cachemiss appears, the HDD layer will be accessed. Meanwhile, according to the principle of data locality, the data will be written back to the Cache layer. If the Cache space is full, some of the data is replaced by an LRU policy.

FlashCache/OpenCAS provides four caching strategies: WriteThrough, WriteBack, writeound, and writeOnly. Since the fourth does not do read caching, we will only look at the first three here.

Writing:

  • WriteThrough: The data write operation is written to the backend storage at the same time as the SSD.
  • Writeback: The data write operation is returned only on SSD and is flushed to background storage by the cache policy.
  • WriteSound: Data writes are written directly to the backend storage, and the cache corresponding to the SSD is invalidated.

Read:

  • WriteThrough WriteBack/WriteAround: first read the SSD, cannot hit will once again read the back-end storage, and data will be brush into the SSD cache.

For more detailed implementation details, see the official documentation for both:

  • FlashCache
  • OpenCAS

Alternative 2: Kafka In-App Implementation

In the first option mentioned above, the core theoretical basis of “data locality” principle is not completely consistent with Kafka’s read and write characteristics, and the feature of “data brush back” will still introduce the problem of cache space pollution. At the same time, the LRU based phase-out strategy of the above architecture is also contradictory with the Kafka read and write characteristics. In the case of concurrent consumption by multiple consumers, the LRU phase-out strategy may mistakenly eliminate some near-real-time data, resulting in performance jacking in real-time consumption jobs.

It can be seen that the alternative solution cannot completely solve the current Kafka pain points, and needs to be transformed from within the application. The overall design idea is as follows: the data is distributed in different devices according to the time dimension, and the data of the near real-time part is cached in the SSD. In this way, when the PageCache competition occurs, the real-time consumption job reads the data from the SSD to ensure that the real-time job will not be affected by the delayed consumption job. The following diagram shows the flow of an architecture based on the application layer implementation to process a read request:

When a consumption request arrives at the Kafka Broker, the Kafka Broker directly obtains data from the corresponding device based on the relationship between the message Offset (Offset) and the device maintained by the Kafka Broker, and does not brush back the data read in the HDD to the SSD in the read request to prevent cache contamination. At the same time, the access path is clear, and there is no additional access overhead caused by Cache misses.

The following table provides a more detailed comparison of the different candidates:

Finally, considering the matching degree with Kafka reading and writing characteristics, overall workload and other factors, we adopt Kafka application layer to implement this scheme, because this scheme is more close to Kafka’s own reading and writing characteristics, and can more thoroughly solve the pain points of Kafka.

New architecture design

An overview of the

According to the analysis of Kafka’s read-write characteristics above, we give the design objectives of the application layer cache architecture based on SSD:

  • Data is distributed on different devices according to the time dimension, and near real-time data is distributed on SSDs and phased out to HDDs over time.
  • All data in the Leader partition is written to the SSD.
  • Data read from the HDD is not brushed back to the SSD.

According to the above objectives, we present the implementation of Kafka cache architecture based on SSD in the application layer:

A Partition in Kafka consists of several logSegments, each containing two index files and log message files. The logSegments of a Partition are ordered in the Offset dimension.

According to the design idea in the previous section, we first mark different logSegments as different states. As shown in the figure (upper part of the figure), there are three permanent states of onlyCache, Cached and WithoutCache according to the time dimension. The transition between the three states and how the new architecture handles read and write operations are shown in the bottom half of the figure. The logSegment marked OnlyCached is stored only on SSD. The background thread periodically synchronize the logSegment for Inactive (no write traffic) on SSD. LogSegments that have completed synchronization are marked as Cached. Finally, the background thread will check the SSD space periodically. When the SSD space reaches the threshold, the background thread will remove the longest LogSegment from the SSD in terms of time. This segment will be marked with WithoutCache status.

For write requests, write requests still write data to PageCache first and then brush into SSD after the threshold condition is met. For read requests (when PageCache does not retrieve the data), if the logSegment read by offset is in Cached or onlyCache state, then the data is returned from SSD (LC2-LC1 and RC1 in figure). If the logSegment read by PageCache is in withoutCache state, then the logSegment read by offset is in Cached or OnlyCache state. It is returned from the HDD (LC1 in the figure).

For the data synchronization of Follower replicas, write to SSD or HDD can be configured based on the delay and stability requirements of the Topic.

Key optimization point

This paper introduces the design outline and core design ideas of Kafka application layer cache architecture based on SSD, including read and write process, internal state management and new background thread function. This section describes the key optimization points of the solution, which are related to the performance of the service. It mainly includes LogSegment synchronization and optimization of APPEND brush strategy, which will be introduced below.

LogSegment synchronization

LogSegment Synchronization refers to the process of synchronizing data from SSD to HDD. This mechanism is designed with the following two key points:

  1. Synchronization: Synchronization determines how timely SSD data is visible on the HDD, which affects the timeliness of failover and LogSegment cleanup.
  2. Synchronization speed limit: The speed limit mechanism is used during logSegment synchronization to prevent normal read and write requests from being affected during synchronization

synchronously

With respect to the synchronization mode of LogSegment, we present three alternative schemes. The following table lists the introduction of these three schemes and their advantages and disadvantages:

Finally, considering the cost of consistency maintenance, implementation complexity and other factors, we chose the way to synchronize logSegment in the background to Inactive.

Synchronous speed limit

LogSegment synchronization is essentially a data transfer between devices that generates additional read and write traffic on both devices at the same time and consumes the read and write bandwidth of the corresponding devices. Also, since we have chosen to synchronize the Inactive part of the data, we need to synchronize the entire segment. If the synchronization process is not restricted, the overall delay of the service will be greatly affected, mainly in the following two aspects:

  • From the perspective of single-disk performance, since the performance of SSD is much higher than that of HDD, the HDD write bandwidth will be full during data transmission. At this time, other read and write requests will appear burr. If there is delayed consumption to read data from the HDD or followers are synchronizing data to the HDD at this time, the service jitter will result.
  • From the perspective of stand-alone deployment, two SSDs and 10 HDDs will be deployed in a single opportunity. Therefore, in the synchronization process, one SSD needs to bear the write amount of five HDDs, so SSDs will also appear performance burr in the synchronization process, affecting the normal request response delay.

Based on the above two points, we need to add a speed limit mechanism to the logSegment synchronization process. The overall speed limit principle is to synchronize the logSegment as quickly as possible without affecting the normal read/write request delay. Because the synchronization speed is too slow, the SSD data cannot be cleaned up in time and will eventually be filled up. This configuration is also set as a single Broker granularity configuration parameter for flexibility.

Log append brush disk strategy optimization

In addition to synchronization issues, the brush mechanism during data writing also affects the service’s read and write latency. The design of this mechanism not only affects the performance of the new architecture, but also affects native Kafka.

The following diagram shows the processing flow of a single write request:

In the Produce request processing process, first determine whether to scroll LogSegment according to the current location of LogSegment and data information in the request, then write the data in the request to PageCache, update LEO and statistics information, and finally determine whether to trigger brush disk operation according to statistics information. Force the brush through FileChannel.force if necessary, otherwise the request is returned directly.

In the entire process, except for log scrolling and disk brushing, all operations are in memory and do not cause performance problems. Log scrolling involves file system operations. Currently, Kafka provides a perturbation parameter for log scrolling to prevent multiple segments from simultaneously triggering scroll operations and stressing the file system. For log brush operation, the current mechanism proposed by Kafka is to trigger forced brush with a fixed number of message messages (the current line is 50000). This mechanism can only guarantee that messages will be brushed at the same frequency when the incoming flow is certain, but it cannot limit the amount of data brushed into the disk each time, and cannot provide effective limits on the load of the disk.

As shown in the figure below, is the instantaneous value of write_bytes of a disk during the afternoon peak period. During the afternoon peak period, due to the increase of write traffic, a large number of burrs will be generated during the disk brushing process, and the value of burr is almost close to the maximum write bandwidth of the disk, which will cause the jittering of read and write request delay.

In view of this problem, we have modified the brush mechanism by changing the original limit by the number of lines to the actual brush rate limit. For a single Segment, the brush rate limit is 2MB/s. This value takes into account the actual average message size on the line. If it is set too small, topics with large single messages will refresh too frequently and increase the average latency when traffic is high. At present, the mechanism has been grayscale in a small range online. The figure on the right shows the write_bytes index corresponding to the grayscale in the same period. It can be seen that compared with the figure on the left, the data brush rate is significantly smoothen than before the grayscale, with the highest rate of only about 40MB/s.

For the new SSD cache architecture, the same problem exists, so in the new architecture, the brush speed is also limited in the brush operation.

Solution testing

Test objectives

  • Verify that the application layer based SSD cache architecture can avoid real-time jobs being affected by delayed jobs.
  • Verify that the application layer based SSD architecture has lower read-write latency under different traffic compared to the cache layer architecture based on the kernel layer of the operating system.

Test scenario description

  • Four clusters were built: the new architecture cluster, the normal HDD cluster, the FlashCache cluster, and the OpenCAS cluster.
  • Each cluster has three nodes.
  • Fixed write flow rate, compare read, write time.
  • Delayed consumption setting: only consume data 10-150 minutes relative to the current time (beyond the PageCache bearing area, not beyond the SSD bearing area).

Test content and key indicators

  • Case1: When there is only delayed consumption, observe the production and consumption performance of the cluster.

    • Key indicators: write time, read time, through these two indicators reflect the read and write delay.
    • Hit rate index: HDD read amount, HDD read ratio (HDD read amount/total read amount), SSD read hit rate. The hit rate of SSD cache is reflected by these three indexes.
  • Case2: In case of delayed consumption, observe the performance of real-time consumption.

    • Key metrics: the proportion of SLA (quality of service) in five different time zones of real-time jobs.

The test results

From a single Broker request latency perspective:

Until the brush mechanism is optimized, the new SSD cache architecture offers significant advantages over the alternatives in all scenarios.

After the optimized brush mechanism, the rest of the schemes had better service quality on latency, and the new scheme had less advantage over other schemes on smaller traffic due to the optimized flush mechanism. When the single node write traffic is large (> 170MB), the advantage is obvious.

From the impact of delayed jobs on real-time jobs:

The new cache architecture met expectations in all scenarios in which delayed jobs had no impact on real-time jobs.

Summary and future outlook

Kafka in Meituan data platform to assume the role of unified data cache and distribution, in view of the current due to PageCache mutual pollution, and then cause PageCache competition lead to real-time jobs are delayed job impact pain points, we have developed the application layer cache architecture of Kafka based on SSD. This paper mainly introduces the design idea of the new Kafka architecture and the comparison with other open source solutions. The new cache architecture has significant advantages over a normal cluster:

  1. Reduced read and write time: Compared to normal clusters, the read and write time of the new architecture cluster is reduced by 80%.
  2. Real-time consumption is not affected by delayed consumption: Compared with ordinary clusters, the real-time read and write performance of the new architecture cluster is stable and not affected by delayed consumption.

At present, this cache architecture has been verified and is in the grayscale stage. In the future, it will be first deployed to the high-optimal cluster. The code involved will also be submitted to the Kafka community as a feedback to the community, and you are welcome to share it with us.

Author’s brief introduction

Shiji and Shi Lu are both Meituan data platform engineers.

| want to read more technical articles, please pay close attention to Meituan technical team (meituantech) WeChat public official.

| in the public, the menu bar reply goodies for [2019], [2018] special purchases, goodies for [2017], [method] the key word, can see Meituan technology team calendar year essay collection.