Brief introduction:From the birth of Ali Group to the commercialization on the cloud, with the development of business and the evolution of technology, Hologres is also continuously optimizing the competitiveness of its core technology. In order to make you better understand Hologres, we plan to continue to launch a series of revealing the underlying technical principles of Hologers, from high-performance storage engine to efficient query engine. High throughput write to high QPS query, a full range of interpretation of Hologers, please continue to pay attention!

Hologres (Chinese name) interactive analysis is ali cloud from the research on the number of one-stop real-time warehouse, the cloud native system combines real-time service and analysis of large data, fully compatible with PostgreSQL deal with large data ecological seamless get through, can use the same set of data architecture also supports real-time written real-time query and real-time offline federal analysis. Its emergence simplifies the architecture of the business, and at the same time provides the ability for the business to make real-time decisions, so that big data can play a greater role in the business value. From the birth of Ali Group to the commercialization on the cloud, with the development of business and the evolution of technology, Hologres is also continuously optimizing the competitiveness of its core technology. In order to make you better understand Hologres, we plan to continue to launch a series of revealing the underlying technical principles of Hologers, from high-performance storage engine to efficient query engine. High throughput write to high QPS query, a full range of interpretation of Hologers, please continue to pay attention!

Highlights of previous seasons:

  • VLDB 2020 paper”Alibaba Hologres: A cloud-Native Service for Hybrid Serving/Analytical Processing
  • Hologres disclosure:First time! Alibaba cloud native real-time data warehouse core technology revealed
  • Hologres disclosure:First unveiling of cloud native Hologres storage engine

In this issue, we will explain the technical principles of Hologers’ efficient distributed query engine.

As the best practice of HSAP service analysis integration, Hologres query engine is a fully self-developed execution engine. Its core design goal is to support all types of distributed analysis and service query, and achieve the ultimate query performance. In order to do this, we learned from a variety of distributed query systems, including analytical databases, real-time data storehouses, etc., and took advantage of all aspects to build a new execution engine from scratch.

Why do you choose to build a new query engine from scratch? There are two main types of open source distributed analytical query systems:

  • One is the traditional MPP Parallel Processing system, which supports general purpose SQL queries, but doesn’t offer much support for real-time scenarios or performance.
  • One category is Apache Druid and ClickHouse, which are designed and optimized for real-time scenarios. They can better support some common single-table real-time queries, but the performance of complex queries is relatively poor.
  • In addition, the engine based on MapReduce in the big data ecosystem is more suitable for batch ETL, which is generally not suitable for online services and multi-dimensional analysis scenarios, and its performance is also quite poor.

The Hologres execution engine is a general architecture that can support complex queries and the above-mentioned high-performance real-time service queries. It first implements the common real-time database scenario, and then further optimizes and verifies with internal Benchmark that the performance and stability are better than those of other competitive products including dedicated real-time database, and then expands to support other complex queries. In the process of scaling, Benchmark also helps keep the performance of simple real-time queries unrollback, as the system inevitably becomes more complex. Improvements to an existing query engine are difficult to achieve because so many architectural and design choices have already been made.

The Hologres execution engine has faced a lot of challenges from development to implementation, but it also provides us with the opportunity to combine various new advances in this field, and surpass existing systems to achieve high-performance processing of various types of queries, mainly based on the following characteristics:

  • Distributed execution model: A distributed execution model that works with the storage and computing separation architecture. The execution plan is represented by a directed acyclic graph (DAG) composed of asynchronous operators, which can express a variety of complex queries, and perfectly fits the data storage model of Hologres to facilitate the connection with the query optimizer, utilizing various query optimization technologies in the industry.
  • Fully asynchronous execution: An end-to-end fully asynchronous processing framework that avoids the bottleneck of high concurrency systems, makes full use of resources, and minimizes the impact of read data delays caused by storage and computation separation systems.
  • Vectorization and column processing: Operators internally process data with maximum possible use of vectorization execution, deep integration with storage engines, flexible execution models, full use of various indexes, and maximization of delayed vector materialization and delayed computation, avoiding unnecessary read data and computation.
  • Adaptive incremental processing: Adaptive incremental processing that applies query patterns to common real-time data.
  • Particular query depth optimization: Unique optimization of some query modes

Each module is described in the following sections.

Distributed execution model

Hologres is a system capable of extending data volumes and computing power infinitely and flexibly, and needs to be able to support efficient distributed queries.

The Hologres query engine executes a distributed execution plan generated by the optimizer. The execution plan consists of operators. Since the data of a table in Hologres is distributed across multiple shards according to the Distribution Key, and each Shard can contain many segments, the execution plan also reflects this structure and is distributed to the nodes where the data is located for execution. Each Table Shard is loaded into a compute node, and the data is cached into memory and local storage on that node. Because it is a separate architecture, if one node fails, the Shard of its service can be reloaded to any of the compute nodes, just as the cache is cleared.

For example, a relatively simple query.

Select key, count(value) as total from table1 group by key order by total desc limit 100

If you have a stand-alone database, you can use such an execution plan. If the data and calculations are spread across multiple nodes, a more complex execution plan is required.

In a distributed table, in order to execute more efficiently and reduce data transmission as much as possible, the execution plan can be divided into different fragments and distributed to the corresponding nodes for execution, and some operations can be pushed down to reduce the output data of the Fragment, which may become an execution plan like this:

Depending on the nature of the data, the optimizer may generate different plans. For example, this operator can be omitted when a local aggregation does not significantly reduce the amount of data. For example, if the Key is the Distribution Key, it can be optimized as:

As can be seen from these examples, the execution plan of Hologres is divided into different pieces according to the characteristics of the data and then distributed and concurrently executed. Data is exchanged between fragments through an Exchange operator. More complex queries, such as multi-table Join queries, have more fragments and more complex data exchange patterns.

For example, the following SQL

Select sum(value) as total from t1 join t2 on t1.user_id = t2.user_id where... group by user_name order by total limit 100

In Hologres it could be such an execution plan

If the Join key and the Distribution key are identical, you can optimize the execution plan as follows to reduce remote data transfers. Properly setting the Distribution Key based on the desired query can significantly improve query performance.

The optimizer may also generate different optimization execution plans, including dynamic filtering, local aggregation, and so on, depending on filtering conditions, statistics, and so on.

Such a distributed execution plan is generic enough to represent all SQL queries and some other queries. The Executive Plan is similar to most MPP systems in that it is easy to borrow and integrate some of the industry’s applicable optimizations. What is slightly unique is that many instances of query plan fragments are aligned with the storage structure of Hologres, enabling efficient partitioning and file clipping.

At the same time, Hologres implements the Explain and Explain Analyze series of PostgreSQL statements, which can display the execution plan in text format and the corresponding execution information, so that users can self-understand the execution plan and make targeted SQL optimization adjustments.

Fully asynchronous execution

High concurrency systems, especially those with a lot of I/O, frequent waiting or task switching are common system bottlenecks. Asynchronous processing is a proven way to avoid these bottlenecks and push the performance of highly concurrent systems to the limit.

The entire back end of Hologres, including execution engine, storage engine and other components, uses the asynchronous lockless programming framework provided by HOS (Hologres Operation System) component in a unified way, which can maximize the effect of asynchronous execution. Each Fragment instance uses an EC (logical scheduling unit) of the HOS, which enables all operators and storage engines in a Fragment to execute asynchronously and secure lockless access to most resources.

Operators and Fragments are interfaces like this:

future<> Open(const SeekParameters& parameters, ...)
future<RecordBatchPtr, bool> GetNext(...)
future<> Close(...)

In addition to the general benefits of asynchronous processing, the asynchronous operator interface can better avoid the impact of relatively high read data delay on query performance under the storage and computation separation architecture, and has unique benefits to the distributed query execution model itself.

DAG execution engines can generally be divided into models that pull data (such as a volcano model) and models that push data (such as a phased execution model for a lot of big data), each with its own strengths and weaknesses. The asynchronous pull model used by Hologres (which has been patented) takes the benefits of both models and avoids their drawbacks. To illustrate this, use a common Hash Join:

The volcano model can simply pull the data of B to build the hash table, and then stream the data of A without putting all the data in the memory. However, when A or B needs to read data, a simple implementation would have to wait for the CPU to fill up, make full use of resources by increasing the number of concurrency fragments or introducing complex pre-fetch mechanisms, which would introduce other performance issues.

In the model of push data, it is relatively easy to read data requests concurrently and trigger downstream processing upon completion, but the implementation of the above Join operator will be more complicated. For example, after A has processed a batch of data, it is pushed to the Join operator while B’s hash table has not been constructed, so this batch of data needs to be temporarily stored in memory or disk, or a backpressure mechanism is introduced. There is a similar problem with the Fragment boundary, which causes some data caches that are not needed in the pull data model.

The asynchronous pull data model of Hologres operator and Fragment can be as simple as the volcano model to obtain data from the upstream as needed, while it can be as simple as the push data model to concurrently read data. As long as multiple asynchronous getNext are sent to the upstream, the subsequent processing will be triggered naturally when the upstream processing is completed. The number and timing of asynchronous GETNEXT can be seen as a natural flow control mechanism that can effectively increase CPU utilization and avoid unnecessary data staging.

Hologres has implemented a complete query engine with this asynchronous model that can support all PostgreSQL queries.

Column processing and vectorization

Column processing and vectorization are common optimization mechanisms for analysis query engines, which can greatly improve the efficiency of data processing. Hologres is no exception, using vector processing whenever it is available.

Hologres also uses column storage in memory. Storing data in memory as columns allows for more vector processing. Another benefit of organizing data in columns is that it is friendlier to lazy computation. Such as the select… Where a = 1 and b = 2… , for a batch of data (generally corresponding to a row group stored), the A and B output by the scan operator of Hologres can be the information of A and B that is read with delay, and the A of this batch will be read when A = 1 is processed. If a=1 does not satisfy all the rows in this batch, the column B in this batch will not be read at all.

However, for some row-by-row operators, such as JOINs, data stored in columns may cause more CPU cache misses, resulting in significant performance problems. Many query engines introduce store-by-column and store-by-row conversions at various points, but the frequent conversions themselves introduce considerable overhead, and column row-by-row conversions cause the above delay-read columns to be read unnecessarily, and there are other performance issues.

Adaptive incremental processing

Many real-time data applications often execute a query repeatedly over different time periods. For example, when a monitor page opens, Select avg(v1) from metrics where d1 = x and d2 = y and ts >= ‘2020-11-11 00:00:00’ and ts < ‘2020-11-11 03:01:05’ select avg(v1) from metrics where d1 = x and d2 = y and ts < ‘2020-11-11 00:00:00’ The and… Group by d3… The next such query will be changed to ts < ‘2020-11-11 00:03:10’ and the next, ts < ‘2020-11-11 00:03:15’.

Streaming or incremental computation can be very efficient for processing such queries. But for such interactive queries that users can generate at will, it is usually not possible to configure stream or incremental computation tasks for all combinations. If the query is simply executed each time, there may be a lot of double counting, resulting in wasted resources and poor performance.

Hologres takes advantage of the deep integration of storage engines and computing engines and the ability to store most of the data in columns in read-only files to provide query results with the latest written data while minimizing the need for double-counting, which can significantly improve performance and reduce resource usage for this type of query.

Deep optimization for specific query patterns

HologRes is uniquely optimized for some specific query patterns. Here, take Filter Aggregate optimization as an example.

Many data applications have the requirement of open columns, which is equivalent to dynamically adding logical columns without changing the Table Schema. For example, the tags (Postgres can use Array) contain multiple logical column values such as ‘{c1:v1, c2:u1}’. When querying, a common type of query that uses regular columns is

-- Q1: 
select c1, sum(x) from t1 where c1 in (v1, v2, v3) and name = 'abc' group by c1

With open columns, such a query would be transformed to

-- Q2: 
select unnest(tags), sum(x) from t1 where name = 'abc' and tags && ARRAY['c1:v1', 'c1:v2', c1:v3'] 
group by unnest(tags) 
having unnest(tags) in ('c1:v1', 'c1:v2', c1:v3')

For such queries, Hologres can use bitmap index to quickly calculate the filtering conditions to get the relevant rows, but then the operation of extracting relevant data from the multi-valued column cannot use vector processing, and the performance cannot reach the optimal level. After investigation, the execution of the query can be transformed into

Q3: select 'c1:v1', sum(x) from t1 where tags && ARRAY['c1:v1'] UNION ALL select 'c1:v2', Sum (x) from t1 where tags && ARRAY[' C1 :v2'] UNION ALL...

In this way, each UNION ALL branch can only read bitmap index of name and tags to calculate the filter condition, and then perform vector calculation SUM\_IF with the data of X column and the filter condition to get the desired result. The problem with this is that each branch has to go through T1, reading the bitmap index of the X column and the Name column, resulting in double counting. Finally, a special operator of Filter Agaggregate is introduced to optimize such commonly used queries to the maximum performance. It can only go through T1 once and eliminate duplicate operations, and only use vector computation to get the result, without reading the data of tags column. The measured performance on a table with dozens of TB is improved by more than 3 times.

Similarly, the Hologres execution engine will try to abstract into more general operators, which can be applied to more scenarios. The Filter Aggregate operator is also one of Hologres’ patents.


The Hologres execution engine aggregates almost all of the most efficient optimizations of related distributed query systems (including various types of indexes) in one architecture and makes unique improvements. By deeply integrating with the storage engine, it can take full advantage of the asynchronous model and make efficient use of various types of indexes to speed up queries. All these add up, bring the performance beyond the existing system, and in the data scale of Alibaba’s Double 11 through the actual combat test, (in 2020, Double 11 withstood the real-time data peak of 596 million/second, based on trillion-level data to provide multi-dimensional analysis and service, 99.99% of the queries can return results within 80ms), To provide distributed HSAP query service with high concurrency and high performance.

In the future, we will successively launch the series of unveiling the underlying technical principles of Hologres. The specific planning is as follows. Please keep your attention!

  • Hologres disclosure:First time! Alibaba cloud native real-time data warehouse core technology revealed
  • Hologres disclosure:First unveiling of cloud native Hologres storage engine
  • Hologres Reveals: Deep parsing and efficient distributed query engine (article)
  • Hologres Unveiled: Transparency Accelerates MaxCompute Query Core Principles
  • Hologres Revealed: How does MaxCompute synchronize with Hologres data a hundred times faster
  • Hologres Unveiled: How does __ support high throughput Upsert
  • Hologres Unveiled: How does __ support ultra high QPS in online service scenarios
  • Hologres Unravels: How __ supports high concurrency queries
  • Hologres Unveiled: How does __ support high availability architectures
  • Hologres Unveiled: How does __ support resource isolation and support multiple loads
  • Hologres reveals the principle and practice of Proxima, a __ vector search engine
  • Hologres Unveiled: __ Read Execution Plan, Query Performance Increases tenfold
  • Hologres: How to design shards and Table groups in a distributed system
  • Hologres Unveiled: How does __ support more Postgres Eco expansion packs
  • Hologres Reveals: High Puff writes Hologres in N positions
  • .

Copyright Notice:The content of this article is contributed by Aliyun real-name registered users, and the copyright belongs to the original author. Aliyun developer community does not own the copyright and does not bear the corresponding legal liability. For specific rules, please refer to User Service Agreement of Alibaba Cloud Developer Community and Guidance on Intellectual Property Protection of Alibaba Cloud Developer Community. If you find any suspected plagiarism in the community, fill in the infringement complaint form to report, once verified, the community will immediately delete the suspected infringing content.