Overview: This article provides a detailed overview of the key technologies of the Data Lake analysis engine and uses StarRocks to help users further understand the architecture of the system.

The author:

Ali Cloud EMR Open Source big data OLAP team

StarRocks Community Data lake analysis team

preface

As digital industrialization and industry digitalization become an important driving force of economy, the data analysis scenarios of enterprises are becoming richer and richer, and the requirements for data analysis architecture are becoming higher and higher. New data analysis scenarios create new requirements in three main areas:

  • Users want to import and store any amount of relational data (e.g., operational databases and data from line-of-business applications) and non-relational data (e.g., operational databases and data from mobile applications, IoT devices, and social media) in a cheaper, more real-time manner
  • Users expect their data assets to be closely protected
  • Users expect data analytics to be faster, more flexible, and more real-time

The first two requirements are well met by the emergence of data lakes, which allow users to import any amount of real-time data. Users can collect data from multiple sources and store it in its original form in the data lake. Data lakes have extremely high level scaling capabilities, allowing users to store data of any size. At the same time, its bottom layer usually uses cheap storage scheme, which greatly reduces the cost of data storage for users. Through sensitive data identification, hierarchical classification, privacy protection, resource permission control, encrypted data transmission, encrypted storage, data risk identification and compliance audit, the data lake helps users establish a security early warning mechanism, enhance the overall security protection ability, and make data available or not and security compliance.

To further meet user requirements for data lake analytics, we need a data lake-ready analytics engine that leverages more data from more sources in less time and enables users to collaborate on processing and analyzing data in different ways to make better and faster decisions. This article will take you through the key technologies of such a data lake analysis engine and help you understand the architecture of the system through StarRocks.

We will continue to publish two articles covering the core and use cases of the Speed Data Lake analysis engine in more detail:

  • ** Code Walkthrough: ** Help readers further understand the principles and implementation of the Fast Data Lake analysis engine by walking through the key data structures and algorithms of the open source analytical database kernel StarRocks.
  • **Case Study: ** Introduces how large enterprises can use StarRocks to provide real-time and flexible insights into the value of data on the data lake to help businesses make better decisions and help readers further understand how theory works in real world scenarios.

What is a data lake?

What is a data lake, according to Wikipedia, “A data lake is A system or repository of data stored in its natural/raw format, usually object blobs or files”. Generally speaking, the data lake can be understood as a layer on the cheap object storage or distributed file system, so that the discrete object or file in these storage systems can be combined together to display a unified semantics, such as the common “table” semantics of relational databases.

After understanding the definition of a data lake, it is natural to wonder what unique capabilities a data lake can provide us, and why we use a data lake?

Before the concept of a data lake was developed, many businesses and organizations used HDFS or S3 to store a wide variety of data generated during their daily operations (for example, an APP company might want to keep a detailed record of user clicks). Because the value of this data may not be discovered in a short time, find a cheap storage system and store it temporarily, hoping to extract valuable information from it at a later date when it can be used. However, HDFS and S3 provide relatively simple semantics (HDFS provides the semantics of files, S3 provides the semantics of objects), and over time engineers may not be able to answer the question of what data they store in them. When using the data in order to prevent subsequent must will parse the data to understand the meaning of the data, smart engineer thought that defines a consistent data organization together, and then use the extra data to describe the data, the additional data is referred to as “yuan” data, because they are describes data. This allows you to parse the metadata later to answer what the data actually means. This is the original purpose of a data lake.

As users become more demanding about data quality, data lakes begin to enrich other capabilities. For example, users are provided with database-like ACID semantics to help users get point-in-time views during continuous data writing and prevent various errors during data reading. Or to provide users with higher performance data import capabilities, etc. Nowadays, the data lake has changed from pure metadata management to now has richer, more database-like semantics.

A less accurate description of a data lake is an “AP database” with cheaper storage costs. However, the data lake only provides the ability of data storage and organization. A complete database not only needs the ability of data storage, but also needs the ability of data analysis. Therefore, how to create an efficient analysis engine for data lake and provide users with insight into data will be the focus of this paper. The following sections take a step-by-step look at the internals and implementation of a modern OLAP analysis engine:

  • How do you do extreme analytics on a data lake
  • Architecture of modern data lake analysis engines

How do you do extreme analytics on a data lake?

Starting with this section, let’s go back to the database course. An analysis engine for a data lake and an analysis engine for a database are architecturally identical, and we generally think of them as being divided into the following sections:

  • Parser: Parses user input queries into an abstract syntax tree
  • Analyzer: Analyzes whether the syntax and semantics of a query statement are correct and conform to the definition
  • Optimizer: Generates higher performance and lower cost physical query plans for queries
  • Execution Engine: Executes a physical query plan and collects and returns the query results

Optimizer and Execution Engine are two core modules that affect the performance of a data lake analysis Engine. Below, we will disintegrate the core technology principles of these two modules one by one from three dimensions and compare different technical solutions. Help readers understand the ins and outs of a modern data lake analysis engine.

RBO vs CBO

Basically, the optimizer’s job is to generate the lowest (or relatively low) cost execution plan for a given query. The performance of different execution plans can vary by thousands of times. The more complex the query, the larger the data volume, and the more important the query optimization.

Rule Based Optimization (RBO) is a common Optimization strategy for traditional analysis engines. The essence of RBO is that its core is equivalent transformation based on relational algebra, which transforms queries through a set of pre-defined rules to obtain a cheaper execution plan. Common RBO rule predicates push down, Limit push down, constant fold, etc. In RBO, there is a strict set of rules, as long as you write the query according to the rules, the generated execution plan is fixed regardless of the contents of the data table. But in a real business environment, the magnitude of the data can seriously affect the performance of the query, and RBO cannot use this information to obtain a better execution plan.

In order to solve the limitation of RBO, Cost Based Optimization (CBO) Optimization strategy came into being. CBO estimates the cost of executing a plan by collecting statistics on the data, including data set size, number of columns, and cardinality of columns. For example, suppose that we now have three tables A, B and C, we cannot judge the difference in the execution order of different joins without corresponding statistics when conducting the query of A join B join C. If we collect statistics of these three tables and find that the data volume of table A and table B is 1M rows, but the data volume of table C is only 10 rows, we can greatly reduce the data volume of intermediate results by executing B join C first, which is basically impossible to judge without statistics.

As query complexity increases, the state space of the execution plan becomes very large. If you brush the algorithm, you know that once the state space is very large, it is impossible to search by violence. At this time, a good search algorithm is particularly important. In general, CBO uses dynamic programming algorithm to get the optimal solution and reduce the cost of recomputing the subspace. When the state space reaches a certain extent, we can only choose greedy algorithm or some other heuristic algorithm to get local optimal. Essentially a search algorithm is a tradeoff between search time and result quality.

(Common CBO implementation architecture)

Record Oriented vs Block Oriented

An execution plan can be thought of as a string of operators (relational algebraic operators) executed end to end, with the output of the previous operator being the input of the next operator. Traditional analysis engines are Row Oriented, that is, operator output and input are rows of data.

As a simple example, suppose we have the following table and query:

CREATE TABLE t (n int, m int, o int, p int); 
 
SELECT o FROM t WHERE m < n + 1; 
Copy the code

Source: GitHub – JordanLewis/Exectoy

When the above query statement is expanded into an execution plan, it is roughly shown as follows:

In general, in a Row Oriented model, the execution process of the execution plan can be represented by the following pseudo-code:

next: 
  for: 
    row = source.next() 
    if filterExpr.Eval(row): 
      // return a new row containing just column o 
      returnedRow row 
      for col in selectedCols: 
        returnedRow.append(row[col]) 
      return returnedRow 
Copy the code

DBMSs On A Modern Processor: Where Does Time Go? A large number of L2 data marshes and L1 i-cache marshes have low efficiency of branch forecasting.

With the rapid development of disk and other hardware technologies, various COMPRESSION algorithms for CPU for I/O, Encoding algorithm and storage technology are widely used, CPU performance has gradually become the bottleneck of analysis engines. In order to solve the problems of Row Oriented implementation, academia began to think of solutions, Block oriented Processing of Relational Database operations in modern Computer Architectures is proposed in this paper The time it takes to pass data between operators, able to amortize condition checking and branch prediction work, MonetDB/X100: Hyper-pipelining Query Execution (Pipelining Query Execution) is designed to improve the efficiency of CPU Cache by changing data from Row Oriented to Column Oriented. It’s also better for the compiler to optimize. In the Column Oriented model, the execution process of the execution plan can be represented by the following pseudo-code:

// first create an n + 1 result, for all values in the n column 
projPlusIntIntConst.Next(): 
  batch = source.Next() 
 
  for i < batch.n: 
    outCol[i] = intCol[i] + constArg 
 
  return batch 
 
// then, compare the new column to the m column, putting the result into 
// a selection vector: a list of the selected indexes in the column batch 
 
selectLTIntInt.Next(): 
  batch = source.Next() 
 
  for i < batch.n: 
    if int1Col < int2Col: 
      selectionVector.append(i) 
 
  return batch with selectionVector 
 
// finally, we materialize the batch, returning actual rows to the user, 
// containing just the columns requested: 
 
materialize.Next(): 
  batch = source.Next() 
 
  for s < batch.n: 
    i = selectionVector[i] 
    returnedRow row 
    for col in selectedCols: 
      returnedRow.append(cols[col][i]) 
      yield returnedRow 
Copy the code

As you can see, Column Oriented has better data locality and instruction locality, which helps improve CPU Cache hit ratio, and makes it easier for compilers to perform SIMD optimizations, etc.

Pull Based vs Push Based

In a database system, an input SQL statement is typically converted into a series of operators that generate a physical execution plan for the actual calculation and return the results. Operators are typically pipelined in the generated physical execution plan. There are two common pipeline methods:

  • Based on data-driven Push Based mode, upstream operator pushes data to downstream operator
  • In the Pull Based mode Based on demand, the downstream operator actively pulls data from the upstream operator. The classic volcano model is the Pull Based model.

The Push Based execution mode improves cache efficiency and query performance.

Reference: Push vs. pull-based Loop Fusion in Query Engines

Architecture of modern data lake analysis engines

From the introduction of the previous section, we believe that the reader has a corresponding understanding of the cutting-edge theory of data lake analysis engine. In this section, we use StarRocks as an example to further explain how the data Lake analysis engine combines these advanced theories and presents them to the user through an elegant system architecture.

As shown in the preceding figure, StarRocks has a simple architecture. The core of the system is Frontend (FE) and Backend (BE), which does not rely on any external components, facilitating deployment and maintenance. FE is mainly responsible for parsing query statements (SQL), optimizing query and query scheduling, while BE is mainly responsible for reading data from the data lake and completing a series of Filter and Aggregate operations.

Frontend

The main function of FE is to transform SQL statements into fragments that BE can recognize through a series of transformations and optimizations. A less precise but understandable analogy is that fragments are tasks in a thread pool if you think of a BE cluster as a distributed thread pool. From SQL text to Fragment, the main work of FE includes the following steps:

  • SQL Parse: Convert SQL text to an AST(Abstract syntax tree)
  • Analyze: Performs syntactic and semantic analysis based on AST
  • Logical Plan: Converts the AST into a Logical Plan
  • Optimize: Rewrite logical plans based on relational algebra, statistics, Cost model, and select the physical execution plan with the lowest Cost
  • Generate fragments: Converts the physical execution plan selected by the Optimizer into a Fragment that can BE executed directly
  • Coordinate: Dispatch the Fragment to the appropriate BE for execution

Backend

BE is the StarRocks backend node that receives Fragment execution from FE and returns the result to FE. All BE nodes of StarRocks are completely equivalent. FE allocates data to corresponding BE nodes according to certain policies. A common Fragment workflow is to read some files in a data lake and call the corresponding readers (e.g. Parquet Reader for Parquet file and ORC Reader for ORC file, etc.) to parse the data in those files. The parsed data is further filtered and aggregated using a vectorization execution engine and returned to another BE or FE.

conclusion

This paper mainly introduces the core technology principle of extreme speed data lake analysis engine and compares different technology implementation schemes from multiple dimensions. In order to facilitate further discussion, the system architecture design of StarRocks, an open source data lake analysis engine, is further introduced. I hope to discuss and exchange with my colleagues.

The appendix

The benchmark

This test uses TPCH 100G standard test set to compare and test the performance gap between StarRocks local surface, StarRocks On Hive and Trino (PrestoSQL) On Hive.

A comparative test was conducted on TPCH 100G data set, with a total of 22 queries, and the results are as follows:

StarRocks was tested using both local storage queries and Hive external queries. StarRocks On Hive and Trino On Hive query the same data, which is stored in ORC format and compressed in Zlib format. The test environment was built using Ali Cloud EMR.

In the end, the total time of StarRocks local storage query was 21s, and the total time of StarRocks Hive external query was 92s. Trino query takes 307 seconds. StarRocks On Hive performs much better than Trino in query performance, but it lags far behind that of local storage. The main reasons are that the network overhead of accessing remote storage is increased, and the latency and IOPS of remote storage are usually lower than those of local storage. The plan is to bridge the gap between StarRocks local surface and StarRocks On Hive by caching and other mechanisms.

The resources

[1] GitHub – jordanlewis/exectoy

[2] DBMSs On A Modern Processor: Where Does Time Go?

[3] Block oriented processing of Relational Database operations in modern Computer Architectures

[4] MonetDB/X100: Hyper-Pipelining Query Execution

[5] help.aliyun.com/document\_d…

The original link

This article is the original content of Aliyun and shall not be reproduced without permission.