background

The characteristics of sales business are large scale, many fields and close demand. Meituan to store food and beverage giant sales system (hereinafter referred to as the “giant”) as the main carrier of sales data to support, not only the scope of the wide, and facing the technology scene is also very complex (hierarchy data display and authentication, more than one-third of the index needs to be accurate to heavy, peak query has reached tens of thousands of level). In this business context, building a stable and efficient OLAP engine to help analysts make quick decisions has become a core goal at Supreus.

Apache Kylin is an open source OLAP engine built based on Hadoop big data platform. It adopts the multi-dimensional cube prediction technology and uses the method of space for time to improve the query speed to sub-second level, which greatly improves the efficiency of data analysis and brings convenient and flexible query function. Optimus adopted Kylin as our OLAP engine in 2016 based on the technology and business fit, and this system has effectively supported our data and analytics systems over the next few years.

In 2020, Meituan to the restaurant business developed rapidly, and the data indicators also increased rapidly. The system based on Kylin has serious efficiency problems in construction and query, which affects the analysis and decision of data and brings great obstacles to the optimization of user experience. After about half a year, the technical team carried out a series of optimization iterations on Kylin, including dimension cutting, model design and resource adaptation, etc., helping to increase the SLA of sales performance data from 90% to 99.99%. Based on the actual combat, we have settled a set of technical solutions covering “principle interpretation”, “process disassembly” and “implementation route”. It is hoped that these experiences and summaries will help more technical teams in the industry improve the efficiency of data output and business decision making.

Questions and Objectives

As a bridge connecting the platform and merchants, sales includes two business models: sales to stores and telephone visits. It is managed step by step with the organizational structure of war zone and human resources. All analyses need to be viewed according to two sets of organizational levels. Under the requirements of consistent index caliber and timely data output, we combined Kylin’s idea of predictive calculation to design the data architecture. As shown in the figure below:

Kylin’s formula for calculating dimension combinations is 2^N (N is the number of dimensions). The official method of dimension pruning is provided to reduce the number of dimension combinations. However, due to the particularity of meal service, the number of unclippable combinations of single task is still as high as 1000+. In the scenario of requirements iteration and manpower and theater organization changes, it takes a lot of resources and a very high build time to look back all the historical data. The architecture design based on business division can greatly ensure the decoupling of data output and the consistency of indicator caliber, but it puts great pressure on the construction of Kylin, which leads to large resource occupation and long time consumption. Based on the above business status, we summarize the problems existing in Kylin’s MOLAP mode, as follows:

  • Difficulty in hitting efficiency problems (realization principle) : there are many steps in the construction process, and there is a strong correlation between each step. It is difficult to find the root cause of the problem only from the appearance of the problem, and it is impossible to effectively solve the problem.
  • Build engine non-iteration (build process) : The historical task still uses MapReduce as the build engine and does not switch to the more efficient build Spark.
  • Unreasonable utilization of resources (construction process) : resource waste, resource waiting, default platform dynamic resource adaptation mode, resulting in small tasks applied for a large number of resources, unreasonable data segmentation, resulting in a large number of small files, resulting in resource waste, a large number of tasks waiting.
  • Long time of core task (implementation route) : The source table of Optimus Sales transaction performance data indicator has large data volume, multiple dimension combinations and high inflation rate, resulting in the construction time of more than 2 hours per day.
  • SLA quality substandard (implementation route) : The overall SLA achievement rate is not meeting the expected target.

After carefully analyzing the problems and determining the big goal of improving the efficiency, we classified the construction process of Kylin and disassembled the core links that could improve the efficiency in the construction process. Through the means of “principle interpretation”, “layer by layer disassembly” and “from point to surface”, the goal of two-way reduction was achieved. Specific quantitative objectives are shown in the figure below:

Optimization premise – principle interpretation

In order to solve the difficult problems of positioning and attribution in efficiency improvement, we interpreted the Kylin construction principle, including the idea of predictive calculation and the by-layer layer algorithm.

Is a

Combine all possible dimensions according to the dimensions, calculate the indicators that may be used in the multi-dimensional analysis, and save the calculated results into CUBE. Suppose we have four dimensions. Each node in this Cube (called a Cuboid) is a different combination of these four dimensions. Each combination defines a set of dimensions for analysis (such as Group By), and the aggregated results of the metrics are stored on each Cuboid. When querying, we find the corresponding Cuboid according to the SQL, read the value of the index, can return. As shown in the figure below:

By-layer By layer algorithm

An n-dimensional Cube is composed of 1 n-dimensional subcube, N (n-1) -dimensional subcube, N*(n-1)/2 (n-2) -dimensional subcube,…… There are N subcubes of 1 dimension and 1 subcube of 0 dimension, and there are 2 to the N subcubes in total. In the layer-by-layer algorithm, the number of dimensions is reduced layer by layer, and the calculations at each level (except the first level, which is aggregated from the original data) are based on the results at the previous level. For example, the result of Group By [A,B] can be aggregated based on the result of Group By [A,B,C] by removing C, so as to reduce repeated calculation. When the 0-dimensional Cuboid is calculated, the whole calculation of Cube will be completed. As shown in the figure below:

Process analysis – layer upon layer disassembly

After in understanding the underlying principles of Kylin, we will optimize the lock in the direction of the “engine”, “read data”, “building the dictionary”, “layered construction”, “file conversion” five links, and refine the problem at each stage, after the train of thought and goal, we finally made it to reduce the computing resources at the same time reduce the time-consuming. Details are shown in the following table:

Build engine selection

At present, we have gradually switched the build engine to Spark. Optimus used Kylin as an OLAP engine back in 2016. The historical tasks were not switched and the parameters were optimized only for MapReduce. In fact, in 2017, the official website of Kylin used Spark as the build engine (the official website used Spark as the build engine), which improved the build efficiency by 1 to 3 times compared with MapReduce. The switch can also be selected through Cube design, as shown in the figure below:

Read source data

Kylin reads source data in Hive as an external table with data files in the table (stored in HDFS) as input to the next subtask, which may have a small file problem. At present, the file number distribution of Kylin upstream data wide table is reasonable, so there is no need to set the merge in the upstream. If the merge is forced, it will increase the processing time of the upstream source table data.

For project requirements, to brush back the historical data or increase the dimension combination, it is necessary to rebuild all the data. Usually, the method of building the history by month is adopted to brush back the history. The problem of too many loaded partitions appears small files, which leads to the slow execution of this process. Rewriting configuration files at the Kylin level, merging small files and reducing the number of maps can effectively improve read efficiency.

Merge source table small files: merge the number of small files in Hive source table, control the number of parallel tasks per Job. Adjustment parameters are shown in the following table:

Kylin level parameter override: Sets the file size of the Map read process. Adjustment parameters are shown in the following table:

Build a dictionary

Kylin saves HBase storage resources by calculating the dimension values that appear in the Hive table, creating dimension dictionaries, mapping the dimension values into codes, and saving statistics. Each combination of dimensions is called a Cuboid. Theoretically, an n-dimensional Cube can have 2^N combinations of dimensions.

Combination number view

After pruning the dimension combinations, it is difficult to calculate the actual dimension combinations. You can check the specific number of dimension combinations by executing the log (the screenshot is the log of the last Reduce in the step of extracting the unique column of the fact table). As shown in the figure below:

Global dictionary dependency

Optimus has many business scenarios that need to be accurately de-weighted. When there are multiple global dictionary columns, column dependencies can be set. For example, when there are data indicators of “number of stores” and “number of online stores” at the same time, column dependencies can be set to reduce the calculation of ultra-high base dimension. As shown in the figure below:

Computing resource allocation

When there are multiple precise de-duplication indicators in the index, computing resources can be appropriately increased to improve the efficiency of the construction of high base dimension. Parameter Settings are shown in the following table:

Hierarchical build

This process is at the heart of Kylin’s construction. After switching the Spark engine, the default is to use only the By-Layer Layer algorithm instead of automatic selection (By-Layer Layer algorithm, Fast algorithm). Spark implements the by-layer layer By layer algorithm By working up from the bottom Cuboid layer to the top Cuboid layer (equivalent to executing a query without group By), caching the resulting data from each layer into memory, skipping each read of data. Depends directly on the upper cache data, greatly improving the execution efficiency. The details of Spark’s implementation are as follows.

Stage of the Job

The number of jobs is the number of layers in the By-Layer algorithm tree. Spark takes the output of the result data in each layer as a Job. As shown in the figure below:

Stage Stage

Each Job corresponds to two stages, which can be divided into reading the cached data of the upper layer and caching the calculated result data of this layer. As shown in the figure below:

Task parallelism setting

Kylin calculated the task parallelism according to the estimated size of Cuboid composite data constructed at each layer (the number of dimension combinations can be reduced by means of dimension pruning to reduce the size of Cuboid composite data and improve the construction efficiency, which is not described in detail in this paper) and the parameter values of the partitioning data. The calculation formula is as follows:

  • Task number calculation formula: Min(MapSize/cut-mb, MaxPartition); Max (MapSize/cut – MB, MinPartition)

    • MapSize: the size of Cuboid combination constructed in each layer, that is, Kylin’s estimate of the size of Cuboid combination in each layer.
    • Cut-MB: Partition of data to control the number of parallel tasks, can be set by kylin.engine. Spark. Rdd-Partition-Cut-MB parameter.
    • MaxPartition: Maximum partition, which can be set with the kylin.engin.spark.max-partition parameter.
    • MinPartition: The smallest partition, which can be set with the kylin.engin.spark.min-partition parameter.
  • Calculate the number of output files: each Task will compress the result data after execution and write it into HDFS as the input of the file conversion process. The number of files is the summary of the number of output files of the Task.

Resource Request Calculation

By default, the platform dynamically requests computing resources. The computing power of a single Executor includes 1 logical CPU (CPU), 6GB of in-heap memory, and 1GB of out-of-heap memory. The calculation formula is as follows:

  • CPU = kylin. Engine. The spark – the conf. Spark, executor. Cores * number of Executors in the actual application.
  • Memory = (kylin. Engine. The spark – the conf. Spark. The executor, the memory + spark. The yarn. The executor. MemoryOverhead) * number of Executors in the actual application.
  • Ability to execute a single Executor = kylin. Engine. The spark – the conf. Spark. Executor. The memory/kylin. Engine. The spark – the conf. Spark. Executor. Cores, namely: The amount of memory requested during the execution of 1 CPU.
  • Largest number = kylin Executors. Engine. The spark – the conf. Spark. DynamicAllocation. MaxExecutors, dynamic application platform by default, this parameter limits maximum application number.

In the case of sufficient resources, if a single Stage applies for 1000 parallel tasks, it needs to apply for resources up to 7000GB of memory and 1000 CPUs, that is: CPU: 1*1000=1000; Memory :(6+1) *1000=7000GB.

Resource rationalization

Due to the characteristics of the by-layer layer algorithm and the compression mechanism of Spark in the actual execution, the partition data loaded By the actual executed Task is far less than the parameter set value, resulting in ultra-high parallelism of the Task, occupying a large amount of resources, and generating a large number of small files, which affects the downstream file conversion process. Therefore, reasonable segmentation data becomes the key point of optimization. Using Kylin to build the log, you can see the estimated size of the Cuboid composite data at each level and the number of partitioned partitions (equal to the number of tasks actually generated at the Stage). As shown in the figure below:

Combined with Spark UI, you can view the actual execution, adjust the application of memory to meet the required resources for execution, and reduce the waste of resources.

1. The minimum value of the overall resource application is greater than the sum of the cached data at the Stage level TOP1 and TOP2, ensuring that all the cached data are in memory. As shown in the figure below:

Calculation formula: Stage Stage Top1, ranked by levels of the sum of the cached data < kylin. Engine. The spark – the conf. Spark. The executor, the memory kylin, engine, spark – the conf. Spark. The memory. The fraction The spark. Memory. StorageFraction * maximum number of Executors

2. The actual memory and CPU required by a single Task (1 CPU per Task) is less than the execution power of a single Executor. As shown in the figure below:

Calculation formula: Single Task required for memory < kylin. Engine. The spark – the conf. Spark. The executor, the memory kylin. The engine, spark – the conf. Spark. The memory. The fraction Spark. Memory. St orageFraction/kylin, engine, spark – the conf. Spark. Executor. Cores. Parameter description is shown in the following table:

File conversion

Kylin converts the Cuboid files after construction into HFILE files in HTABLE format, and associates the files with HTABLE through BulkLoad, which greatly reduces the load of HBase. This process is accomplished by a MapReduce task, and the number of maps is the number of output files in the hierarchical build phase. The log is as follows:

At this stage, computing resources can be reasonably applied according to the actual input data file size (which can be viewed through the MapReduce log) to avoid resource waste.

Formula: the Map phase resource application = kylin. Job. Mr. Config. Override. Graphs. The Map. The memory. The MB * layered construction phase output file number. Specific parameters are shown in the table below:

Implementation route – from point to surface

Pilot trading practice

Through the interpretation of Kylin’s principle and the dismantling of the construction process, we selected the core tasks of sales transactions for pilot practice. As shown in the figure below:

Comparison of practical results

Based on the core tasks of sales transactions, the practical optimization was carried out, and the actual use of resources and execution time before and after adjustment were compared, so as to achieve the goal of two-way reduction. As shown in the figure below:

Results show

Overall resource situation

Optimus has 20+ Kylin tasks. After half a year of continuous optimization iteration, compared with the monthly average CU usage in Kylin resource queues and the CU usage in Pending tasks, the resource consumption under the same task has been significantly reduced. As shown in the figure below:

Overall SLA achievement rate

After the overall optimization from the point and surface, the SLA achievement rate of Optimus in June 2020 reached 100%. As shown in the figure below:

Looking forward to

Apache Kylin officially became the Apache Foundation’s Top Project in November 2015. It only took 13 months to go from open source to a top level Apache project, and it was the first top level project to be fully contributed to Apache by a Chinese team. At present, Meituan uses the relatively stable version of V2.0. After nearly 4 years of use and accumulation, the dining technology team has accumulated a lot of experience in optimizing query performance and building efficiency. This paper mainly describes the resource adaptation method in the Spark construction process. It is worth mentioning that Kylin officially released the V3.1 version in July 2020 and introduced Flink as the construction engine. The core process of Flink construction was uniformly used, including data reading stage, dictionary construction stage, hierarchical construction stage and file conversion stage. The above four parts accounted for more than 95% of the overall construction time. This update also greatly improves the build efficiency of Kylin. See: Flink Cube Build Engine.

Looking back at the evolution of Kylin’s build engine, from MapReduce to Spark to now Flink, iterations of build tools have been moving towards better mainstream engines, and the Kylin community is full of active and good code contributors who are helping to expand Kylin’s ecosystem and add new features. It’s worth learning. Finally, the Meituan In-Store F&B technical team would like to thank the Apache Kylin project team once again.

Author’s brief introduction

Yue Qing joined Meituan in 2019 and worked as an engineer in the restaurant’s catering R&D center.

To read more technical articles, please follow the official WeChat official account of Meituan Technical Team (Meituantech).