Authors: Dong Tingting, Xu Ming

Abstract: As a platform for short video sharing and live broadcasting, Kuaishou has applied Flink in many business scenarios, including short video, quality monitoring of live broadcasting, user growth analysis, real-time data processing, CDN scheduling of live broadcasting, etc. This paper mainly introduces the application and optimization of Flink in real-time multi-dimensional analysis scene in Kuaishou. The main contents include:

  1. Flink in Kuaishou application scenario and scale
  2. Kuaishou real-time multidimensional analysis platform
  3. SlimBase- More IO saving, embedded shared state storage

Tips: Click the link below to view the author’s original PPT and shared video ~ ververica.cn/developers/…

Flink in Kuaishou application scenario and scale

First look at the application scenario and scale of Flink in Kuaishou.

1. Application scenarios of Kuaishou

Fast hand computing link from DB/Binlog and WebService Log into Kafka in real time, and then into Flink for real time calculation, including real time data warehouse, real time analysis and real time training. The final result is stored in Druid, Kudu, HBase, or ClickHouse. At the same time, Kafka data is dumped to Hadoop cluster in real time, and then Hive, MapReduce, or Spark is used for offline calculation. The final real-time calculation and offline calculation result data will be presented by internal BI tool KwaiBI.

Typical application scenarios of Flink in Kuaishou are mainly divided into three categories:

  • 80% statistical monitoring: real-time statistics, including various data indicators, monitoring items alarm, used to assist business real-time analysis and monitoring;
  • 15% data processing: data cleaning, splitting, Join and other logical processing, such as big Topic data splitting and cleaning;
  • 5% data processing: Real-time business processing, real-time processing for specific business logic, such as real-time scheduling.

Typical scenarios of Flink application in Kuaishou include:

  • Kuaishou is a platform to share short videos and live broadcast. The quality of kuaishou short videos and live broadcast is monitored by real-time statistics through Flink, such as the broadcast quantity of live viewers and anchors, lag rate, failure rate of broadcast and other monitoring indicators related to live broadcast quality.
  • User growth analysis, real-time statistics of the new situation of each channel, real-time adjustment of the amount of each channel according to the effect;
  • Real-time data processing, advertising display stream, real-time Join of click stream, splitting of client logs, etc.
  • Real-time CDN scheduling, real-time monitoring of the quality of each CDN manufacturer, real-time Flink training to adjust the flow ratio of each CDN manufacturer.

2. Flink Cluster scale

At present, Kuaishou has a cluster scale of about 1500 units, and a total of 3 trillion items are processed daily. The peak number of items is about 300 million /s. All clusters are deployed in On Yarn mode. Real-time clusters and offline clusters are physically isolated using the Yarn label. Real-time clusters are Flink dedicated clusters and are deployed for services that require high isolation and stability. Note: The data mentioned in this article only represent the data shared by the guests.

Kuaishou real-time multidimensional analysis platform

Here we focus on sharing the real-time multidimensional analysis platform of Kuaishou.

1. Kuaishou real-time multidimensional analysis scenario

Kuaishou has such application scenarios, with daily data volume of ten billion. The business side needs to select any combination of five dimensions in the data to conduct full-dimensional modeling and calculate the cumulative PV (Page View), UV (Unique Visitor), increase or retention indicators, etc. Then the calculation results of indicators should be displayed in real-time graphical reports for business analysts to analyze.

2. Scheme selection

The community already has tools for OLAP real-time analytics like Druid and ClickHouse; At present, Kuaishou adopts Flink+Kudu scheme. In the preliminary research stage, the three schemes are compared and analyzed in terms of computing capacity, grouping and convergence capacity, query concurrency and query delay, combined with real-time multidimensional query business scenarios.

  • Computing power: Druid supports Sum, Count, and Count DISTINCT. Druid does not support Count DISTINCT. Kuaishou does not support Count DISTINCT by numeric types but does not support Count DISTINCT by character types. ClickHouse itself supports all of this computing power; Flink is a real-time computing engine that has all of these capabilities.
  • Group aggregation capabilities: Druid’s group aggregation capabilities are mediocre, while ClickHouse and Flink support strong group aggregation capabilities.
  • Query concurrency: ClickHouse’s index is weak and does not support high query concurrency, Druid and Flink both support high concurrency, and Kudu, the storage system, also supports strong indexing and high concurrency.
  • Query latency: Druid and ClickHouse both perform live calculations at query time, while Flink+Kudu uses Flink to perform real-time calculations and stores the results directly to Kudu. The results are queried directly from Kudu without calculation, so the query latency is low.

The main idea of Flink+Kudu scheme is that it draws lessons from Kylin’s idea. Kylin can specify many dimensions and indicators to perform off-line calculation and then store the calculation results in Hbase. Kuaishou’s solution is to calculate the indicators in real time through Flink and write them to Kudu in real time.

3. Scheme design

The overall process of real-time multidimensional analysis is as follows:

  • Users configure Cube data Cube model on KwaiBI, a BI analysis tool developed by Kuaishou themselves, specify dimension column and index column and what kind of calculation to do based on the index.
  • The data table selected in the configuration process is stored in the real-time data warehouse platform after processing.
  • Then, according to the configured calculation rules, the Flink task was used to predict the modeling indexes, and the results were stored in Kudu.
  • Finally, KwaiBI queries data from Kudu for real-time kanban display.

Next, the main modules of real-time multidimensional analysis are described in detail.

■ Data preprocessing

The data tables selected for KwaiBI configuration dimension modeling are pre-processed:

  • Firstly, there is an internal meta-information system, in which a unified schema service is provided. All information is abstracted into logical tables.
  • For example, Kafka’s topic, Redis, Hbase tables and other metadata information are extracted into schemas and stored.
  • Kuaishou Kafka’s physical data format is mostly Protobuf and Json format, schema service platform also supports its mapping to logical tables;
  • After the logical table is built, the user can clean and filter the data in the real-time data warehouse.

■ Modeling and calculating indicators

After data preprocessing is completed, the most important step is to calculate modeling indicators. Here, Cube and GroupingSet dimension combination is supported to calculate UV (Unique Visitor), add and retain indicators accumulated by hours or days, which can output results at a fixed time interval according to user configuration. In the dimension aggregation logic, the DAG operation diagram is very complicated by the method of dimensionality reduction calculation layer by layer, as shown in the model in the upper right corner of the figure. Therefore, Kuaishou designed a two-layer dimensionality reduction calculation model, which was divided into full-dimension layer and residual dimension layer. In this way, it not only utilized the aggregation results of full-dimension layer but also simplified the DAG operation diagram.

Taking UV index calculation as an example, two yellow dotted boxes correspond to two layers of computing modules: full-dimension computing and dimensional-reduction computing.

  • The full-dimensional calculation is divided into two steps. In order to avoid data skew problem, the first step is the pre-aggregation of dimensionality and hash the same dimensionality values. Because the UV index needs to be accurately de-weighted, the Bitmap is used for de-weighting operation. Every minute a window calculates the Bitmap of the data in the incremental window and sends it to the second step according to the full dimension aggregation; In the full aggregation, the incremental Bitmap is merged into the full Bitmap to obtain the accurate UV value. However, some people have a question. This scheme can be used for numeric types such as user IDS, but what should be done for character types such as Deviceid? In fact, at the source, variables of type character are converted to unique values of type Long through dictionary services before the data is aggregated into dimensions, and the UV is recalculated through bitmaps.
  • In dimensionality reduction calculation, the results obtained through full-dimension calculation are pre-aggregated and then full aggregated, and finally the results are output.

Again, several key points in modeling index calculation are introduced. In the calculation of modeling indicators, in order to avoid the dimension data skew problem, pre-aggregation (hash hash of the same dimension) and full aggregation (hash hash of the same dimension) are used to solve the problem.

In order to solve the problem of UV exact de-duplication, as mentioned above, we use Bitmap for exact de-duplication. The dictionary service converts String data into Long data, which is easy to store in Bitmap. Because UV statistics require historical data, such as accumulated data by day, over time, The Bitmap will get bigger and bigger. In Rocksdb state storage, reading and writing KV that is too large will consume performance. Therefore, a customized BitmapState is internally defined to store the Bitmap in blocks. As a result, when stored in RocksDB, a KV will be small, and updates will only require partial bitmap updates based on Blockid rather than full updates.

Next, look at the index calculation of the new class, and the difference between just UV is the need to determine whether it is a new user, through asynchronous access to the external historical user service to determine the new user, and then according to the new user flow calculation of the new UV, this calculation logic and UV calculation is consistent.

Different from UV calculation, not only the data of the current day but also the historical data of the previous day are needed to calculate the retention rate. In internal implementation, dual buffer state storage is used, and the retention rate can be calculated by dividing the dual buffer data during calculation.

S Kudu storage

Finally, after the above calculation logic, the results will be stored in Kudu, which itself has the characteristics of low latency random read and write and fast column scan, which is suitable for real-time interactive analysis scenarios. In terms of storage mode, the dimension is encoded first, and then the time + dimension combination + dimension value group cooperation is the primary key. Finally, the partition is carried out according to dimension combination, dimension value combination and time, which is conducive to improving the efficiency of query and quickly obtaining data.

4. KwaiBI display

The interface configures the screenshot of Cube model, configures some columns and specifies the type, then describes the logic of index calculation through an SQL statement, and the final result will also be displayed through KwaiBI.

SlimBase- More IO saving, embedded shared state storage

SlimBase is an embedded shared state storage engine that uses less IO than RocksDB.

1. Challenges

Firstly, let’s take a look at the problems encountered by Flink in using RocksDB. Firstly, we will describe the application scenarios of Kuaishou and real-time Join scenarios of click stream for advertising display: opening Kuaishou App may receive advertising videos recommended by advertising services, and users may click the displayed advertising videos.

This behavior will form two data streams in the back end, one is the advertisement display log, the other is the client click log. Real-time Join is carried out on these two data, and Join results are used as sample data for model training, and the trained model will be pushed to online advertising services.

In this scenario, the click in the next 20 minutes is considered to be a valid click, while the real-time Join logic refers to the display in the last 20 minutes when the data Join is clicked. Among them, the data volume of the presentation stream is relatively large, with more than 1TB of data in 20 minutes. The checkpoint is set to five minutes, and Backend is set to RocksDB.

In this scenario, you face 70% disk I/O overhead, 50% of which comes from Compaction. During the Checkpoint period, the disk I/O overhead reaches 100% and takes 1 to 5 minutes, or even longer than the Checkpoint interval. Services can obviously feel the back pressure. Problems found through analysis:

  • First, four times as many data copies are made during Checkpoint. That is, all data is read from RocksDB and written to HDFS as a triple copy.
  • Second, RocksDB’s default Level Compaction has a significant IO magnification overhead for large scale writes.

2. Solutions

To avoid data copy problems caused by Checkpoint, we need to find a solution. The general idea is to directly access the shared storage during data writing. This can be done by trying to create a more iO-efficient Compaction, such as using a sizetier Compation, or by creating a FIFOCompaction that takes advantage of the nature of the timing data. By comparing shared storage, Sizetier Compation, FIFOCompaction based on event time, and the technology stack, a consensus is reached that HBase replaces RocksDB.

  • Shared storage is supported by HBase but not RocksDB
  • By default, RocksDB does not support SizeTieredCompation. HBase does not support SizeTieredCompation
  • RocksDB does not support FIFOCompaction based on event time, but HBase is relatively easy to develop
  • In terms of technology stack, RocksDB uses C++ and HBase uses Java, so HBase transformation is more convenient

But HBase is worse than RocksDB in some ways:

  • HBase is a heavyweight distributed system that relies on ZooKeeper and contains Master and RegionServer. RocksDB is just an embedded Lib library and is very lightweight.
  • In terms of resource isolation, HBase is difficult because the memory and CPU are shared by multiple Containers. RocksDB is easier because memory and CPU are naturally isolated with containers.
  • In terms of network overhead, HBase is distributed, so the overhead is much higher than embedded RocksDB.

Based on the above reasons, Kuaishou reached the second consensus, which is to slim down HBase and transform it into an embedded shared storage system.

3. Implementation scheme

Next, the implementation scheme of transforming HBase into SlimBase is introduced, which is mainly divided into two layers:

  • The first layer is SlimBase itself, which consists of three layers: Slim HBase, adapter and interface layer.
  • The other layer is SlimBaseStateBackend, which mainly contains ListState, MapState, ValueState and ReduceState.

This section describes HBase slimming, adaptation, and implementation of operation interfaces, and SlimBaseStateBackend.

S HBase thin body

Firstly, HBase weight loss is discussed in terms of weight loss and weight gain.

  • Delete the Client, ZooKeeper, and Master HBase and retain only the RegionServer
  • RegionServer is then tailored to remove the ZK Listener, Master Tracker, Rpc, WAL, and MetaTable
  • Only Cache, Memstore, Compaction, Fluster, and Fs in RegionServer are retained

In terms of weight loss:

  • Migrate the HFileCleaner used to clean HFiles from Master to RegionServer
  • RocksDB supports merge, but SlimBase does not, so merge is implemented

The interface layer mainly has the following three implementations:

  • Similar to RocksDB, the logical view has two levels: DB and ColumnFamily
  • Some basic interfaces are supported: PUT/GET /delete/ Merge and snapshot
  • Additional support is provided for the Restore interface for recovering from snapshot

The adaptation layer has the following two concepts:

  • A SlimBase ADAPTS to a namespace of Hbase
  • A SlimBase ColumnFamily ADAPTS to an HBase table

The realization of SlimBaseStateBackend is mainly reflected in two aspects:

  • First, a variety of States implementations support a variety of data structures, such as ListState, MapState, ValueState and ReduceState
  • The second is the process of rebuilding Snapshot and Restore. As can be seen from the following two figures, SlimBase saves a lot of resources on disk I/O and avoids the problem of multiple I/OS.

4. Test conclusions

After the online comparison test, the test conclusions are as follows:

  • The delay of Checkpoint and Restore decreases from minutes to seconds.
  • Disk I/O is down 66%
  • Disk write throughput decreased by 50%
  • CPU overhead is down 33 percent

5. Post-optimization

While this Compaction is performed when a new Compaction algorithm is created, it compacts another Compaction algorithm based on OldestUnexpiredTime to create no disk I/O operations.

FiFOCompaction is a TTL – based, IO-less Compaction. If OldestUnexpiredTime is set to T2, all data generated before T2 expires and can be cleaned by Compaction. A point-in-time FIFOCompaction theoretically results in no disk I/O overhead.

There are four further optimizations, the first three are hbase-based optimizations, and the last one is for HDFS:

  • SlimBase uses InMemoryCompaction to reduce memory Flush and Compaction costs
  • SlimBase supports prefixBloomFilter to improve Scan performance
  • SlimBase supports short circuit reading
  • HDFS copy drop disk modification: Use DirectIO to drop non-local copies to improve local read pagecache hit ratio. This section is mainly about the problem that the read and write efficiency of single copy is higher than that of multiple copies in the test

Plan for the future

SlimBase is more suitable for the development of fast hand real-time computing task than RocksDB from the seven aspects of language, storage, compression strategy, event event push-down, garbage collection, checkpoint time and reload time. The future plan is to further optimize the performance of SlimBase. The vision is to replace RocksDB with SlimBase for all business scenarios on Kuaishou Flink.

About the author:

Tingting Dong is the leader of Kuaishou Big Data Architecture team and real-time computing Engine team. At present, I am responsible for the development and application practice of Flink engine in Kuaishou Company. I graduated from dalian university of technology in 2013. I once worked in qihoo 360,58 group. My fields include distributed computing, scheduling and distributed storage.

Xu Ming is a kuaishou Big data architecture r&d engineer. Graduated from Nankai University, currently working in the Kuaishou data architecture team, responsible for HBase engine and surrounding ecological maintenance and research and development.