Reprinted from the Apache Kylin official account (also by me) : link to the original article. This article is a summary of the introduction by Kylin on Parquet on April 18, 2020 and a quick start post shared by Meepup. Since Kylin on Parquet is still in constant iterative development, some parts of the original text are updated here.
- CountDistinct, TopN, and Percentile, which we talked about in the Build section, are now supported. See Kylin-4462 for more details.
- Added automatic retry section PPT page
- The performance data of the construction part was collected in the early stage of the project comparison, and a more updated and detailed comparison has been made recently. For details, please see “Go to HBASE, how is Kylin on Parquet’s performance?”
Apache Kylin on Apache HBase scheme has been relatively mature after a long time of development, but there are some limitations. At present, the main calculation of Kylin query node is completed in the single machine node, which has the problem of single point. In addition, due to the non-real column storage of HBase, Cuboids information needs to be compressed and encoded, and then deserialized and partitioned when reading HBase data, which increases the computational pressure additionally. In addition, HBase is difficult to operate and maintain, so it is not easy to upload to the cloud. In the face of the above problems, Kyligence introduced the Kylin on Parquet scheme. Here, Rupeng Wang, Kyligence’s big data R&D engineer, explains the architecture, principles and how to develop and debug the Kylin on Parquet solution. This paper mainly includes the following aspects: First we will introduce the architecture design, then we will explain why we are doing Kylin on Parquet, then we will introduce the new build and query engine and its performance compared to Kylin 3.0, and finally we will have a live Demo. To introduce the use of the product and code debugging methods.
Apache Kylin has long been designed as a pluggable architecture so that you can easily replace one module without affecting the other.
Kylin on Parquet also implements a new query, build engine and storage module on the basis of Kylin’s original architecture. A query engine implemented through Spark enables you to submit computational tasks to YARN for distributed processing.
The Cube build side is also handled entirely through Spark, with no support for MapReduce builds. Data sources now support both Hive and local CSV data sources, and it is now possible to get out of the sandbox and set up a debug environment from local CSV data sources. The storage layer removes HBase, and finally the Cube data built is stored directly in the file system in the form of Parquet.
Why Kylin on Parquet?
First of all, the original HBase-dependent architecture of Kylin has a single point of problem when querying, because the processing of a query task after obtaining data through the Coprocessor is completed on the single machine of the query node.
HBase is not a true columnar store. It holds data for each row through a rowKey. It is called “columnar” because it manages column data through the structure of a column family.https://en.wikipedia.org/wiki….
We can see that in the Cube logical view below, Kylin 3.0 and before compress all dimensions and measures into one column for storage, which requires deserialization, segmentation and other operations during query, which increases the calculation pressure.
Finally, HBase is difficult to maintain and difficult to operate.
The main thing about the query process is that CalCite parses the SQL into a physical execution plan tree where the code for the computation logic is generated through CalCite, which can be difficult to debug and locate problems.
Kylin on Parquet is now able to run distributed queries via Spark. We have transformed the CalCite generated execution plan into Spark’s execution plan by adding breakpoints to each of the layers of data that we can view.
For example, if we suspect that there is a problem in the Agg, we can add a breakpoint in the Agg step to see if the data meets our expectations. The storage side is replaced with PARQUET, and all dimensions and measures are stored in each column. The storage structure will be described in more detail later.
Cube builds and queries
The build engine
Here’s a look at the new build engine and how it works.
- The key features
Here are the key features:
- The build engine is handled entirely through Spark, and everything in between can be monitored on Sparkui. If there are problems with the build process, you can also view the execution of the task on Sparkui.
- The build engine has added automatic parameter tuning, which allows users to adjust Spark parameters based on the size of the build task without manually configuring Spark parameters to perform tasks more efficiently.
- The build engine implements a distributed build of the global dictionary.
- Added the function of automatic recovery of failed tasks, when the task failed, the build engine will analyze the cause of the failure of the current task, and then execute different processing strategies according to different failure cases.
- Interface design
At the beginning of the sharing, I mentioned Kylin’s pluggable architecture design, so the interfaces implemented in the upper layer, from AbstractExecutable to CubingJob, are all original Kylin interfaces. We can submit a task to build a Segment by calling SparkCubingJob’s create method, and then abstracted out the two steps of resource exploration and Cube building. These two steps will also be covered in more detail later. Finally, these two steps are linked together and submitted to the cluster or local execution via a Spark task.
The build steps include resource exploration and a Cube build. The resource probe does three things. First, it estimates the size of the current data source table, which is used for the second step of automatic parameter tuning. Third, it builds the global dictionary.
The Cube build is similar to the original build engine as a whole, starting with the creation of a flat table using Spark, building the Cube layer by layer, storing it in the form of Parquet, and finally updating the Metadata. The main reason why we have so much processing in one step is that the data is mostly processed in memory via Spark, and if we break it down into multiple steps, we have to persist the intermediate data and so on, which makes the processing efficiency less efficient. The figure on the right is the execution of the build task on the front end.
- Automatic tuning parameter
Automatic parameter tuning is turned on by default and only works in cluster mode, and manual configuration takes precedence over automatic tuning. It estimates the computational resources required for the current build task based on, among other things, the size of the data source, and ultimately adjusts Executor-related parameters in the Spark task.
- Global dictionary
The global dictionary feature has two major improvements over Kylin 3.0: the ability to process things in a distributed way; No longer limited to the maximum value of an integer type. In fact, Kylin 3.0 is a new addition to the distributed build dictionary function, but the default or stand-alone build way.
The specific steps are as follows:
- Create a flat table using Spark and get DISTINCT values for the corresponding columns
- Spread data across buckets
- The data in each bucket is encoded
- Save dictionary files and metadata data (number of buckets and offset value of buckets)
The first time the dictionary is built, the values in each bucket are encoded from 1. After the encoding is completed, the whole dictionary value is assigned according to the offset value of each bucket.
The second submission of the Segment build task reassigns the value of each bucket, encodes it relative to the existing value in the bucket, and updates the dictionary value in each bucket relative to the global value based on the new offset.
The directory structure saved on disk is shown in the figure.
- Automatic retry
The automatic retry feature analyzes the exception or error that caused the build task to fail and takes a different handling strategy.
- When an OutOfMemoryError is encountered, the engine will check whether the Spark task has an AUTO_BroadcastJoin_THRESHOLD turned on. This feature can cause Spark tasks to report out of memory. Try disabling this feature. Then resubmit the build task.
- If a ClassNotFoundException is encountered, the build engine will simply terminate the current task and throw an exception.
- For other exceptions, the build engine tries to adjust the number of Executor Core and the allocated memory size, and then resubmit the task.
This feature is retried three times by default and is enabled by default. If you want to disable this feature, you can set the kylin.engin.max-retry-time to 0 or any negative number.
- To measure the
The build process processes all the metrics, and the processing logic can be seen in the CuboidAggregator.scala file. Because the query engine still has some compatibility problems, TopN, CountDistinct, and Percentile can not be checked now, but there is an issue under way. (Note: all the above measures are supported in the latest version of Kylin on Parquet.)
- Parquet storage
Assuming that the resulting Cuboid content we generate has three dimensions and two metrics, the corresponding schema of the Parquet file looks like the middle diagram. We will map the dimension names to a unique number to further optimize storage. We can download the parquet file locally and see the current parquet file via Spark, which is the schema content of our saved cuboid file.
The directory structure stored on disk is shown in the figure above. All files are grouped by project, including dictionaries, temporary files produced by the build, and all cuboids completed by the build. The Segment directory will have a separate signature to prevent write conflicts and other issues.
- The performance comparison
We compared the new build engine to Kylin 3.0’s build engine (MapReduce), which runs in a cluster with four compute nodes and YARN with 400 gigabytes of memory and 128 cores. The internal version used by Spark, due to some optimizations we made to the Spark source code, does not currently support the community version of Spark. The dataset tested is the standard SSB dataset.
On the left is the size of the final storage footprint, which can be reduced by half by the new build engine. On the right is the build time comparison, and you can also see that the new build engine is also much faster than Kylin 3.0.
After a query request is issued, CalCite analyzes the SQL and parses it into an Abstract Syntax Tree (AST). After verifying and optimizing the AST, it is converted into an execution plan tree (RelNodes). The new query engine converts all RelNodes into Spark execution plans. Finally, all the querying tasks are performed via Spark.
The query engine converts each computation logic into the corresponding Spark logic. This step in the conversion is quite a bit of work, as CalCite has its own type, and Spark has its own type, which we need to handle. Some of Calcite’s function operations also require some implementation.
As mentioned at the beginning, we can add breakpoints to each DataFrame for debugging, and query the intermediate value, which makes it easier to troubleshoot problems. The query engine creates a resident process in YARN dedicated to handling the query task the first time a query request is received.
For the query engine has also done the dependency isolation processing, mainly to prevent the problem of external dependency class conflict.
- The performance comparison
The performance of the query engine was also compared to Kylin 3.0, and the test environment was the same as the build performance test environment, which I won’t go into here. We compared the SSB dataset with the TPCH dataset.
The SSB dataset is about 60 million rows in size, but most of the standard SQL for SSB is relatively simple, so the queries we see are basically completed in less than a second.
The TPCH dataset is about 12 million rows in size. The standard SQL for TPCH is a little bit more demanding. As you can see, Kylin3.0 takes a lot of time to query, and the new build engine can perform queries much faster because of some optimizations for complex queries.
The Demo presentation
Please click the link below to play the live review video and drag the progress bar to 26:35 to start watching.