Introduction: The concept of data lake is becoming increasingly popular. This paper introduces the principle behind “StarRocks Rapid data Lake analysis” jointly by OLAP team of Ali Cloud open source big data and StarRocks data lake analysis team.

StarRocks is a powerful data analysis system designed to provide fast, unified and easy-to-use data analysis capabilities to help users gain faster insight into the value of data with lower usage costs. With a streamlined architecture, an efficient vectomization engine, and a newly designed cost-based optimizer (CBO), StarRocks’ analytical performance, especially for multi-table JOIN queries, far outperforms its peers.

In order to meet the needs of more users for rapid data analysis, and to enable StarRocks’ powerful analysis capabilities to be applied to a wider range of data sets, the OLAP team of Ali Cloud open source Big Data has joined hands with the community to enhance StarRocks’ data lake analysis capabilities. Enables it to analyze not only data stored locally in StarRocks, but also data stored in open source data lakes or warehouses such as Apache Hive, Apache Iceberg, and Apache Hudi with equal performance.

This article focuses on the technology behind StarRocks’ fast data lake analytics capabilities, performance and future plans.

1. Overall structure

In the data lake analysis scenario, StarRocks is responsible for computational analysis of data, while the data lake is responsible for storage, organization, and maintenance of data. The figure above depicts the completed technology stack consisting of StarRocks and the data Lake.

The StarRocks architecture is simple. The core processes of the system are FE (Frontend) and BE (Backend), which do not rely on any external components, facilitating deployment and maintenance. FE is mainly responsible for parsing query statements (SQL), optimizing query and query scheduling, while BE is mainly responsible for reading data from the data lake and completing a series of Filter and Aggregate operations.

Data lake itself is a collection of technical concepts, common data lake usually contains Table Format, File Format and Storage three modules. Table Format is the “UI” of the data lake. It organizes structured, semi-structured, and even unstructured data so that it can be stored in distributed file systems such as HDFS or object storage such as OSS and S3, and exposes the relevant semantics of Table structure externally. Table Format has two main genres. One is to organize metadata into a series of files and store them together with the actual data in a distributed file system or object store, such as Apache Iceberg, Apache Hudi and Delta Lake. Another option is to use a custom metadata service to store metadata separately, such as StarRocks local tables, Snowflake, and Apache Hive.

The main function of File Format is to provide a convenient expression for efficient retrieval and compression of data units. At present, common open source File formats include Apache Parquet and Apache ORC in column Format and Apache Avro in line Format.

Storage is a module used to store data in a data lake. Currently, the most commonly used storages in a data lake are distributed file system HDFS, object Storage OSS, and S3.

FE

The main function of FE is to convert SQL statements into fragments that can BE recognized by BE. If the BE cluster is considered as a distributed thread pool, then the Fragment is the Task in the thread pool. From SQL text to distributed physical execution plan, the main work of FE goes through the following steps:

  • SQL Parse: Convert SQL text to an AST(Abstract syntax tree)
  • SQL Analyze: Performs syntactic and semantic analysis based on AST
  • SQL Logical Plan: Converts the AST into a Logical Plan
  • SQL Optimize: Rewrite logical plans based on relational algebra, statistics, Cost models, and select physical execution plans with the “lowest Cost”
  • Generate Plan Fragment: Converts the physical execution Plan selected by the Optimizer into a Plan Fragment that can BE executed directly.
  • Scheduling of execution plans

BE

Backend is a Backend of StarRocks and is responsible for data storage and SQL calculation execution.

All BE nodes of StarRocks are completely equivalent. FE allocates data to corresponding BE nodes according to certain policies. During data import, data will BE directly written to BE node instead of being transferred through FE. BE is responsible for writing imported data into the corresponding format and generating relevant indexes. During SQL calculation, an SQL statement is first planned into logical execution units according to specific semantics, and then divided into physical execution units according to data distribution. The physical execution unit executes on the node where the data is stored to avoid data transfer and copy, thus achieving the highest query performance.

Two, technical details

Why is StarRocks so fast

The CBO optimizer

In general, the more complex the SQL, the more tables that Join, and the larger the data volume, the greater the significance of the query optimizer, because the performance difference between different execution modes can be hundreds or thousands of times. The StarRocks optimizer is based on Cascades and ORCA papers and is deeply customized, optimized and innovated with the StarRocks actuator and scheduler. Complete support for tPC-DS 99 SQL, implement common expression reuse, related subquery rewrite, Lateral Join, CTE reuse, Join Rorder, Join distributed execution strategy selection, Runtime Filter push-down, Low cardinality dictionary optimization and other important features and optimizations.

One of the key points of CBO optimizer is whether Cost estimation is accurate, and one of the key points of Cost estimation is whether statistical information is collected timely and accurately. StarRocks currently supports table level and column level statistics, both automatic and manual, and both full and sample collection.

MPP perform

Massively Parallel Processing (MPP) is an abbreviation for massively parallel computing. The core method is to split a query Plan into a number of compute instances that can be executed on a single node, and then execute them on multiple nodes in parallel. Each node does not share CPU, memory, or disk resources. The query performance of an MPP database can improve as the cluster scales horizontally.

As shown in the figure above, StarRocks logically splits a Query into Query fragments. Each Query Fragment can have one or more instances of its execution. Each Fragment execution instance is scheduled to BE executed in the cluster. As shown in the figure above, a Fragment can contain one or more operators (operators). The Fragment in the figure includes Scan, Filter, and Aggregate. As shown in the figure above, each Fragment can have a different degree of parallelism.

As shown in the figure above, multiple fragments execute in parallel in memory as pipelines, rather than Stage By Stage like a batch engine.

As shown in the figure above, the Shuffle operation is the key to improving query performance of the MPP database with the horizontal scaling of the cluster, as well as the implementation of high cardinality aggregation and large table joins.

Vectorization execution engine

As the bottleneck of database execution shifted from IO to CPU, StarRocks re-implemented the entire execution engine based on vectoization in order to maximize CPU performance. The core of operator and expression vectorization execution is batch execution by column, batch execution, compared with single line execution, can have fewer virtual function calls, less branch judgment; Column execution is more CPU-cache-friendly and SIMD optimized than row execution.

Vectorization perform is not only a database of all operator to quantify and expression to quantify, but a huge and complicated engineering optimization, including data in the disk, memory, according to the column in the network organization, data structure and algorithm to design, redesign of memory management, SIMD instruction optimization, CPU Cache optimization, optimization of c + +, etc. Vectorization improves overall performance by 5 to 10 times compared to the previous row execution.

How does StarRocks optimize data lake analytics

In the field of big data analysis, data is not only stored in the data warehouse, but also stored in the data lake. Traditional data lake implementation solutions include Hive/HDFS. In recent years, the most popular concept is LakeHouse. Common implementation schemes include Iceberg/Hudi/Delta. Can StarRocks help users better mine the value of data in the data Lake? The answer is yes.

In the previous section, we explained how StarRocks can do extreme analytics, and if you use these capabilities for data lakes, it will definitely make for a much better data lake analytics experience. In this section, we’ll look at how StarRocks implements extreme data lake analysis.

Taking a look at the global architecture, StarRocks and the main modules related to data lake analysis are shown below. Data Management is provided by the Data lake, and Data Storage is provided by the object Storage OSS, S3, or HDFS.

The data lake analytics capabilities already supported by StarRocks can be summarized in the following sections:

  • Support Iceberg V1 table query github.com/StarRocks/s…
  • Support Hive external query external table @external_table @starRocks Docs (dorisdb.com)
  • Support Hudi COW table query github.com/StarRocks/s…

In terms of query optimization and query execution, let’s take a look at how StarRocks brings the power of rapid analysis to the data lake.

Query optimization

The query optimization part mainly uses the CBO optimizer introduced earlier to achieve, the data lake module needs to give statistics to the optimizer. Based on these statistics, the optimizer uses a set of policies to optimize the query execution plan. Let’s look at some common strategies by example.

statistics

In this example, HdfsScanNode contains cardunality, avgRowSize and other statistics in the generated execution plan.

MySQL [hive_test]> explain select l_quantity from lineitem; +-----------------------------+ | Explain String | +-----------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:5:  l_quantity | | PARTITION: UNPARTITIONED | | | | RESULT SINK | | | | 1:EXCHANGE | | | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 01 | | UNPARTITIONED | | | | 0:HdfsScanNode | | TABLE: The lineitem | | partitions = 1/1 | | cardinality = 126059930 | | avgRowSize = 8.0 | | | numNodes = 0 +-----------------------------+Copy the code

These statistics are calculated before being formally entered into the CBO optimizer. For example, for Hive, we have MetaData Cache to Cache such information. For Iceberg, we calculate such statistics from the MANIFEST information of Iceberg. After obtaining these statistics, the effect of the subsequent optimization strategy is greatly improved.

Partitions cutting

Partition clipping is an optimization that can only be performed if the target table is a partitioned table. By analyzing the filtering conditions in the query statement, partition clipping selects only the partitions that may meet the conditions, and does not scan the unmatched partitions, thus significantly reducing the amount of data to be calculated. For example, in the following example, we create a facade with ss_sold_date_sk as the partition column.

create external table store_sales( ss_sold_time_sk bigint , ss_item_sk bigint , ss_customer_sk bigint , Ss_coupon_amt decimal(7,2), ss_net_paid decimal(7,2), ss_net_paid_inc_tax decimal(7,2), ss_net_profit decimal(7,2), ss_sold_date_sk bigint ) ENGINE=HIVE PROPERTIES ( "resource" = "hive_tpcds", "database" = "tpcds", "table" = "store_sales" );Copy the code

When you perform the following query, only data between partitions 2451911 and 2451941 is read, and data in other partitions is filtered out. This saves a large part of the NETWORK I/O consumption.

select ss_sold_time_sk from store_sales 
where ss_sold_date_sk between 2451911 and 2451941 
order ss_sold_time_sk; 
Copy the code

Join Reorder

The query efficiency of multiple table joins is greatly related to the order in which each table participates in the Join. Select * from T0, T1, T2 where t0.a =T1.a and t2.a =T1.a;

  • T0 and T1 Join first, and then Join T2
  • T1 and T2 Join first, and then Join T0

T0 and T2 have different performance depending on the amount and distribution of data. In response to this situation, StarRocks implements DP and greedy Join Reorder mechanism in the optimizer. Join Reorder is currently supported for Hive data analysis, and other data source support is also under development. Here’s an example:

MySQL [hive_test]> explain select * from T0, T1, T2 where T2.str=T0.str and T1.str=T0.str; +----------------------------------------------+ | Explain String | +----------------------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:1: str | 2: str | 3: str | | PARTITION: UNPARTITIONED | | RESULT SINK | | 8:EXCHANGE | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: HASH_PARTITIONED: 2: str | | STREAM DATA SINK | | EXCHANGE ID: 08 | | UNPARTITIONED | | 7:HASH JOIN | | | join op: INNER JOIN (BUCKET_SHUFFLE(S)) | | | hash predicates: | | | colocate: false, reason: | | | equal join conjunct: 1: str = 3: str | | |----6:EXCHANGE | | 4:HASH JOIN | | | join op: INNER JOIN (PARTITIONED) | | | hash predicates: | | | colocate: false, reason: | | | equal join conjunct: 2: str = 1: str | | |----3:EXCHANGE | | 1:EXCHANGE | | PLAN FRAGMENT 2 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | STREAM DATA SINK | | EXCHANGE ID: 06 | | HASH_PARTITIONED: 3: str | | 5:HdfsScanNode | | TABLE: T2 | | partitions = 1/1 | | cardinality = 1 | | avgRowSize = 16.0 | | numNodes = 0 | | PLAN fragments | 3 | OUTPUT EXPRS: | | PARTITION: RANDOM | | STREAM DATA SINK | | EXCHANGE ID: 03 | | HASH_PARTITIONED: 1: str | | 2:HdfsScanNode | | TABLE: T0 | | partitions = 1/1 | | cardinality = 1 | | avgRowSize = 16.0 | | numNodes = 0 | | PLAN fragments 4 | | the OUTPUT EXPRS: | | PARTITION: RANDOM | | STREAM DATA SINK | | EXCHANGE ID: 01 | | HASH_PARTITIONED: 2: str | | 0:HdfsScanNode | | TABLE: T1 | | partitions = 1/1 | | cardinality = 1 | | avgRowSize = 16.0 | | | numNodes = 0 +----------------------------------------------+Copy the code

Predicate push-down

Predicate push-down pushes filter expression calculations in a query statement down as close to the data source as possible, thereby reducing the overhead of data transfer or computation. For the data lake scenario, we implemented Min/Max and other filtering conditions to be pushed into Parquet, which can quickly filter out unused Row groups when reading Parquet files.

For example, for the following query, the l_discount=1 condition is pushed down to the Parquet side.

MySQL [hive_test]> explain select l_quantity from lineitem where l_discount=1; +----------------------------------------------------+ | Explain String | +----------------------------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:5: l_quantity | | PARTITION: UNPARTITIONED | | | | RESULT SINK | | | | 2:EXCHANGE | | | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 02 | | UNPARTITIONED | | | | 1:Project | | | <slot 5> : 5: l_quantity | | | | | 0:HdfsScanNode | | TABLE: lineitem | | NON-PARTITION PREDICATES: 7: L_discount = 1.0 | | partitions = 1/1 | | cardinality = 63029965 | | avgRowSize = 16.0 | | | numNodes = 0 +----------------------------------------------------+Copy the code

Other strategies

In addition to the strategies introduced above, for data lake analysis, we also adapted strategies such as Limit push-down, TopN push-down, sub-query optimization and so on. Can further optimize query performance.

Query execution

As mentioned earlier, the StarRocks execution engine is omnidirectional and MPP architecture, which will undoubtedly improve our ability to analyze data lake data. Let’s take a look at how StarRocks schedules and executes data lake analysis queries.

Query scheduling

The data of the data lake is generally stored on HDFS and OSS, considering the mixed and immixed parts. We implement a load balancing algorithm for Fragment scheduling.

  • After partitioning is clipped, all HDFS file blocks to be queried are obtained

  • Construct THdfsScanRange for each block, where hosts contains the Datanode addresses where all copies of the block reside. Finally, the List is obtained

  • Coordinator maintains a map of the number of scan ranges that have been allocated to all BeIs. The map of the number of blocks to be read that have been allocated to disks on each Datanode is >. And the average number of scan ranges allocated by each be numScanRangePerBe

  • If the datanode where the block copy resides has be (mixed part)

  • Each SCAN range is preferentially allocated to the BE with the smallest number of Scan ranges where the copy resides. If the number of Scan ranges allocated by BE is greater than numScanRangePerBe, the remote BE with the smallest number of Scan ranges is selected

  • If multiple BeIs have the same small number of Scan ranges, consider the disk on be and select the BEIs allocated on the disk where the copy resides that have a small number of blocks to be read

  • If the Datanode where the block copy resides does not have be (deployed separately or can be read remotely)

  • Select be with the smallest number of Scan ranges

Query execution

After the execution is scheduled to BE, the entire execution process is vectorized. In the following example of Iceberg, the BE end corresponding to IcebergScanNode is currently the vectorization implementation of HdfsScanNode, and other operators are similar in that they are vectorization implementations at the BE end.

MySQL [external_db_snappy_yuzhou]> explain select c_customer_id customer_id -> ,c_first_name customer_first_name -> ,c_last_name customer_last_name -> ,c_preferred_cust_flag customer_preferred_cust_flag -> ,c_birth_country customer_birth_country -> ,c_login customer_login -> ,c_email_address customer_email_address -> ,d_year dyear -> ,'s' sale_type -> from customer, store_sales, date_dim -> where c_customer_sk = ss_customer_sk -> and ss_sold_date_sk = d_date_sk; +------------------------------------------------ | PLAN FRAGMENT 0 | OUTPUT EXPRS:2: c_customer_id | 9: c_first_name | 10: c_last_name | 11: c_preferred_cust_flag | 15: c_birth_country | 16: c_login | 17: c_email_address | 48: d_year | 70: expr | | PARTITION: UNPARTITIONED | RESULT SINK | 9:EXCHANGE | PLAN FRAGMENT 1 | OUTPUT EXPRS: | PARTITION: RANDOM | STREAM DATA SINK | EXCHANGE ID: 09 | UNPARTITIONED | 8:Project | | <slot 2> : 2: c_customer_id | | <slot 9> : 9: c_first_name | | <slot 10> : 10: c_last_name | | <slot 11> : 11: c_preferred_cust_flag | | <slot 15> : 15: c_birth_country | | <slot 16> : 16: c_login | | <slot 17> : 17: c_email_address | | <slot 48> : 48: d_year | | <slot 70> : 's' | 7:HASH JOIN | | join op: INNER JOIN (BROADCAST) | | hash predicates: | | colocate: false, reason: | | equal join conjunct: 21: ss_customer_sk = 1: c_customer_sk | 4:Project | | <slot 21> : 21: ss_customer_sk | | <slot 48> : 48: d_year | 3:HASH JOIN | | join op: INNER JOIN (BROADCAST) | | hash predicates: | | colocate: false, reason: | | equal join conjunct: 41: ss_sold_date_sk = 42: d_date_sk | 0:IcebergScanNode | TABLE: Store_sales | cardinality = 28800991 | | avgRowSize = 1.4884362 numNodes = 0 | PLAN FRAGMENT 2 | OUTPUT EXPRS: | PARTITION: RANDOM | STREAM DATA SINK | EXCHANGE ID: 06 | UNPARTITIONED | 5:IcebergScanNode | TABLE: Customer | cardinality = 500000 | avgRowSize = 36.93911 | numNodes = 0 | PLAN fragments 3 | OUTPUT EXPRS: | PARTITION: RANDOM | STREAM DATA SINK | EXCHANGE ID: 02 | UNPARTITIONED | 1:IcebergScanNode | TABLE: Date_dim | cardinality = 73049 | avgRowSize = 4.026941 | numNodes = 0Copy the code

3. Benchmarking

Tpc-h is a test set developed by THE Transaction Processing Performance Council (TPC) to simulate decision support applications. It consists of a suite of business oriented ad-hoc queries and concurrent data modifications.

Tpc-h models a data warehouse of a sales system based on a real production environment. The test consists of 8 tables, and the amount of data can be set from 1 GB to 3 TB. The benchmark consists of 22 queries, and the primary metric is the response time of each query, that is, the time from the submission of the query to the return of the result.

The test results

A comparative test was conducted on TPCH 100G data set, with a total of 22 queries, and the results are as follows:

StarRocks was tested using both local storage queries and Hive external queries. StarRocks On Hive and Trino On Hive query the same data, which is stored in ORC format and compressed in Zlib format. The test environment was built using Ali Cloud EMR.

In the end, the total time of StarRocks local storage query was 21s, and the total time of StarRocks Hive external query was 92s. Trino query takes 307 seconds. StarRocks On Hive performs much better than Trino in query performance, but it lags far behind that of local storage. The main reasons are that the network overhead of accessing remote storage is increased, and the latency and IOPS of remote storage are usually lower than those of local storage. The plan is to bridge the gap between StarRocks local surface and StarRocks On Hive by caching and other mechanisms.

For details, please refer to StarRocks vs Trino TPCH Performance Test Comparison Report

4. Future planning

Thanks to core technologies such as a full quantitative execution engine, CBO optimizer, and MPP execution framework, StarRocks has achieved fast data lake analysis capabilities that far exceed those of other products. In the long term, StarRocks’ vision for data lake analytics is to provide users with extremely simple, easy-to-use and high-speed data lake analytics capabilities. In order to achieve this goal, StarRocks still has a lot of work to do, including:

  • Integrated Pipeline execution engine, through Push Based Pipeline execution, further reduce the query response speed
  • Automatic hierarchical storage of hot and cold data. Users can store frequently updated hot data on the StarRocks local surface. StarRocks will automatically migrate cold data from the local surface to the data lake on a regular basis
  • The step of explicitly establishing the appearance is removed, and the user only needs to establish the resource corresponding to the data lake to realize the automatic synchronization of the data lake database table
  • Further improve StarRocks support for data lake product features, including MOR tables for Apache Hudi and V2 tables for Apache Iceberg; Support to write data lake directly; Support Time Travel query, improve Catalog support, etc
  • Hierarchical Cache is used to further improve the performance of data lake analysis

5. More information

Refer to the link

[1] help.aliyun.com/document\_d…

[2] github.com/StarRocks/s…

[3] docs.dorisdb.com/zh-cn/main/… [4] github.com/StarRocks/s…

[5] StarRocks vs Trino TPCH Performance test comparison report

The original link

This article is the original content of Aliyun and shall not be reproduced without permission.