Brief introduction: “Real-time Data Warehouse Bootcamp” is joined by many technology/product front-line experts such as Ali Cloud researcher Wang Feng, Ali Cloud senior technical expert Jin Xiaojun, Ali Cloud senior product expert Liu Yiming and other real-time computing Flink version and Hologres. Together, they build the course system of the training camp and carefully refine the course content. Targeting the pain points encountered by the current students. Analyse the architecture, scenario and practical application of the real-time database from the simple to the deep. 7 excellent courses will help you grow from a little white to a great man in 5 days!

This paper sort the live broadcast of the Hologres performance tuning practices – qingfen video link: https://developer.aliyun.com/…

Brief content:

Hologres table building best practices

II. Analysis and optimization of Hologres performance problems

Hologres table building best practices

(a) the necessity of table optimization

Why is Hologres table optimization important?

First, there is a big difference between a good table build and a bad table build in terms of overall query performance and write performance.

Secondly, table optimization needs to be done as soon as possible, because Hologres may need users to repeat some data import while changing DDL, and such repeated work makes us want to complete table optimization as soon as possible.

Finally, a good table builder also helps with the cost of data storage for users. If the table is not built properly, it can lead to unnecessary indexes being built, which in turn leads to redundant storage of the data, increasing the cost. Therefore, table build optimization is very important, which is why it is included in the first part of this article.

(2) Business modeling is the premise of performance optimization



After saying the importance of table building, we will look at the optimization of the entire business modeling before table building optimization. As we consider using Hologres, we need to know what business problems can be solved with Hologres, and in what ways.

Hologres itself is a HSP product. When using Hologres, we need to combine it with a business scenario. We need to know whether this scenario is an analysis scenario or an online service scenario. If it is an analytical scenario, the column store of Hologres is more friendly, and if it is an online service scenario, the row store is more friendly, which is relevant to the business scenario.

The second is to combine the product advantages of Hologres itself. Hologres is an online service and interactive analysis product, which is not suitable for ETL and large data drag and drop scenarios. Therefore, when you move the business to Hologres, you can’t move all the scenes, otherwise it might cause Hologres to do something that doesn’t suit itself very well, and trust will be bad.

The third is that there are trade-offs to be made. In order to achieve the expected performance, it may be necessary to do some operations in advance, such as prediction calculation or data processing, to reduce the complexity of subsequent calculation and speed up the calculation.

All of this is related to up-front data modeling and overall business expectations.

(3) Selection of storage mode

After the above preparation work, we need to select the storage mode of Hologres management.

Hologres itself supports two types of storage, row storage and column storage.

The main use scenario for row storage is high QPS queries on primary keys, and when our table is wide, a single query reads a large number of columns, which is a good fit for Hologres.

In addition, the dimension table query of Blink must use row store, because in general, the dimension table of Blink is a high-QPS, key-based query, and column store cannot withstand such high pressure.

Column stores are suitable for complex interactive analysis queries, such as a query that has associations, aggregations, and all kinds of complex calculations. At the same time, it covers a lot of scenarios, including filtering, aggregation, etc., column storage is a relatively common storage mode. Row storage is mainly suitable for online service scenarios, while column storage is mainly suitable for analysis scenarios. This is the difference between the two storage methods.

(4) Optimize the number of shards

Shard_count: Shard implements the effect of physical separate tables, and multiple shards serve queries in parallel. Adding shards can increase the distributed parallelism of the query. More shards will not necessarily make the query faster, and will also bring the scheduling overhead of concurrent queries.

With that said, let’s look at the Shard number.

When storing Hologres, physical tables are divided into shards for storage, and each table is distributed to all physical nodes in a certain distribution mode. Then each Shard can conduct concurrent queries. The more shards there are, the higher the concurrency of the whole query is. However, the number of shards is not always better, because it has some extra overhead, so we need to design the number of shards for each table according to the data volume of the whole table lookup and the query complexity.

In the case of cluster scaling, for example, if we were an instance of 128 cores, we would need to adjust the total number of shards after scaling to 256 cores in order to take advantage of the performance increase.

Since our entire concurrency is based on the number of shards, if the instance is expanded but the number of shards is not changed, then the concurrency of the entire computation is equivalent to the same, which will lead to the query performance is not improved although the size is expanded.

In general, we recommend that users set the number of shards to be similar to the instance size, such as 40 or 64 shards for a 64 Core, which is closer to the instance size. As the specification goes up, we expect the number of shards to go up as well, thus increasing the concurrency of the overall query.

(V) Optimize Distribution Key

After the Shard count, let’s take a look at the very important Distribution Key in Hologres, which is used to determine how data is allocated to each Shard.

Distribution_key: Distributors data evenly across multiple shards, balancing the load of queries and locating them directly to the appropriate Shard.

If a Primary Key index is created (for data updates), the default is distribution_key, and if the distribution_key is empty, the default is Random, and the distribution_key must be a subset of the Primary Key.

A good Distribution Key design first requires that the user’s data be evenly distributed across the Distribution Key.

For example, a user ID or a product treasure ID, there is usually only one Key, so it is very uniform and is a good example of a Distribution Key. However, things like age or gender are not suitable for Distribution keys because they may Shuffle a lot of data onto a node, resulting in uneven Distribution of the whole data.

The main purpose of the Distribution Key is to reduce shuffling of data in associative queries and aggregations.

If the user does not set the Distribution Key, then we default to RANDOM, because we want to ensure that the user’s data is distributed as evenly as possible across all shards.

Now let’s look at the main role of the Distribution Key.

In Hologres, we will have different tables and put them into different tablegroups. For tables with the same number of shards, they will all be put under one TG.

Assuming two tables are related, if the Distribution Key is designed according to the associated Key, then the association of the two tables can be made into a LOCAL JOIN, as shown on the left of the figure above. All data does not need to be shuffled, each table is on top of each Shard, and the result is generated directly after the correlation.

If the amount of data increases, capacity expansion may be needed later. We hope that all tables under this TG will have capacity expansion, so as to ensure the consistency of data distribution and maintain the entire LOCAL JOIN, instead of not being able to do the LOCAL JOIN due to capacity expansion.

The performance difference between a Local Join and a non-local Join is very large, usually by an order of magnitude. The most relevant to the Local Join is the design of Distribution Key. If the design of Distribution Key is not reasonable, a large number of Data shuffles may be caused during the Join, affecting the efficiency.

As shown in the figure above, if table A and table B are supposed to have an association, if it is not in the Distribution Key scenario, then we need to Shuffle the data of table A and B according to its Join Key. Shuffle will bring very high cost and affect the efficiency of the whole query.

So in general, if you want to Join tables, you can set the JOIN relationship to DISTRIBUTION key, enabling the Table to be joined locally within the same Shard.

(6) optimize the partition table

Partitioned Table: This is also a physical Table, with the same Shard capability but an additional ability to Pruning according to the partitioning key.

As shown in the figure above, assuming that only some of the partitions are hit by the filter criteria of the query, the remaining partition table does not need to be scanned, which can greatly save the IO of the entire query and speed up the query.

In general, the number of partitioned keys is static and not too many. The best partitioned keys are dates. For example, if the business is partitioned on a daily basis, or by the hour, then the query will also filter the data according to a certain period of time.

Through the partition table, when the user’s query condition contains time filtering, it can filter out the unnecessary partition, which has a great improvement to the query performance.

Usually the date column and other fields with low cardinality (less than 10,000) are used as partitioning fields. If there are too many partitioning tables, and the query does not have partition filtering conditions, the performance will decline.

(7) Optimize Segment Key

A Shard is a logical unit of data that is physically a set of files (multiple tables distributed to the same Shard, along with the corresponding indexes).

Segment Key is mainly used for column storage.

The Segment Key is used to skip files that do not need to be searched. When the Segment Key is searched on a Shard, there are a number of files in the Shard. The Segment Key is used to skip files that need not be searched.

Assume that the Segment is set to a time and the data is written according to the time. For example, 10pm to 11pm is a file, 11pm to 12pm is a file, 12pm to 13pm is a file. When we want to query data between 12:15 and 12:35, in this case, the Segment Key can quickly find the hit query of the file from 12:00 to 13:00, so we just need to open this file. This way you can quickly skip unnecessary file scans, reduce IO, and make the overall query faster.

The figure above mainly introduces the entire data writing process to help you understand what the Segment Key is.

As mentioned earlier, when the data is written to the Hologres, it will be written to the memory table, the memory is full, asynchronous Flush to the file, Append Only write, write the highest efficiency. Because there is no global sorting and single file update when writing for performance, Overlap exists between files.

What does this mean?

The Segment_key is used to divide the boundary of the file block. When querying, the file block where the data is located can be quickly located based on the Segment_key. When Segment_key has more columns, it is aligned to the left.

As the example mentioned above, 11 to 12 in one file is an ideal situation, the actual situation may be 11 to 12 in one file, 11:30 to 12:30 in a second file, 12:30 to 13:00 in another file, there may be overlap between the files. The query might hit multiple files, which might result in multiple files needing to be opened.

Therefore, when designing the Segment Key, Overlap should be avoided as far as possible, and it should be increased sequentially as far as possible. If the Segment Key is written out of order, such as 123, 678, and 456, then the Segment Key may have duplicate data in different files, so that the Segment Key does not function as a query filter at all.

Therefore, the most critical point in designing the Segment Key is to be as monotonous as possible without Overlap, so that we can skip such unnecessary data scanning as much as possible.

The design of the Segment Key is mainly used in the real-time writing scene of Blink, and the Key is set as a time field, because the real-time writing time of data is increasing, and each value will not have a large Overlap, so the Segment Key is more suitable.

In addition, in other scenarios, it is not recommended that users set their own Segment Key.

(8) Optimize the ClusteringKey

Clustering_key represents the sorting information. Different from MySQL’s cluster index, Hologres is used to arrange the data, not the layout index. Therefore, modifying Clustering requires re-importing the data, and there is only one Clustering_key. The sort operation completes the generation of SST in memory.

The figure above is an example. On the left of the figure is a completely out of order situation. If Date is used as the ClusteringKey, it will become the upper right corner of the graph. After sorting by Date, if Date query is conducted, such as data whose Date is greater than 1/1 and less than 1/3, the corresponding time period can be quickly checked.

Assuming that this Index is not done, we need to scan all the data. This is the principle of accelerating the query through ClusteringKey.

Assuming that Class is sorted by Class, if Class and Date are used as clusteringkeys, then Class is sorted by Class first, and then Date is sorted by Date, as shown in the lower right of the figure. For this kind of scene, the design of ClusteringKey was based on the principle of leftmost matching, that is, when the query conditions of users were encountered, it was also based on the principle of leftmost matching.

For example, if the Key of the query is Class, the ClusteringKey can be hit. If the query criteria are Class and Date, the ClusteringKey can also be hit. But if the query criteria are only Date, the ClusteringKey cannot be hit. Following the leftmost match is equivalent to, from left to right, no matter how many of the user’s query conditions, the leftmost condition must match.

The main function of ClusteringKey is to speed up the filtering of queries, the filtering of Range queries and the filtering of dot searches. The disadvantage of ClusteringKey is that each table can have at most one ClusteringKey and can only have one sorting method.

(9) Optimize dictionary coding

Dictionary encoding is effective for string types, especially for columns with small cardinality. Encoding values speeds up comparison operations, and for Group By, Filter is good, and HOLO is automatically set after 0.9.

The figure above is an example of dictionary coding.

As shown on the left side of the figure above, there are Card No and Gender. Since there are only two values for male and female gender, it is very suitable for dictionary coding. If you code gender as 0 and 1, it becomes the way you do in the middle of the figure.

When you do a data query, you need to encode the filter criteria. For example, if you want to check Card No for all men, the filter criteria will become Gender 0, and you can do a digital query in this way.

One drawback of dictionary coding, however, is that it is very expensive for scenarios with large cardinality columns.

Because we encode the data first, in the encoding process, if the data is a total of 1 million lines, and there are 990,000 different values in the data, we will have 990,000 Encoded values. This situation will cause high consumption of the whole encoding and query, and it is not suitable for dictionary encoding.

After Hologres0.9, we support the automatic setting of dictionary encoding so that users do not have to configure the dictionary encoding themselves.

(10) Optimize the bitmap index

Bitmap index has obvious optimization effect for equivalent filtering scenarios. Multiple equivalent filtering conditions are calculated by vector comparison.

Bitmap indexing is equivalent to identifying the existence of each column of data via a bitmap.

As shown in the above figure, the gender and class of the middle school students in the left table are bitmapped to get the picture on the right. Through these bitmap information, we can quickly filter.

For example, if we want to check all male students, we can filter through “1 0” to get the four rows of PK 1, 2, 5 and 6 that meet the query conditions in the figure on the right. Suppose we want to filter out the students in Class 3, then we construct a bitmap “0 0 1”, and then do a filtering with the data of the class, and we can get the information with Pk values of 2 and 6.

As you can see, the main application scenario of bitmap index is point search, such as the query condition is male and the age is equal to 32 years old, which is also very suitable for bitmap query acceleration.

Similarly, bitmap index also has a problem, that is, the column with too much cardinality will form sparse array (many columns, few values) during bitmap index encoding, which has little impact on query performance improvement.

(11) Physical topology

I’ve covered a few indexes and the overall storage, so let’s see how they can be distinguished and what the overall user perspective might look like as an abstraction.

As shown above, after the user writes an SQL, it will first route to the corresponding Table according to the user partitioning key to find the correct logical object Table.

The second step is to find the corresponding Shard through the Distribution Key.

The third step is the Segment Key. After finding the Shard, we need to find the file corresponding to the Shard. Since the actual data is stored as a file, we can use the Segment Key to find the file we want to open.

The fourth step is to find out whether the data is organized inside the file by means of the “Clustering Key”, which helps us find the actual file interval.

The fifth step is Bitmap. Because Hologres stores the data in Batch by Batch, in a Batch, we need to quickly locate to a certain row through Bitmap, otherwise we need to scan all the data within a certain range. The different processes in the diagram move from top to bottom, further and further into the file, and further up into the larger scope.

II. Analysis and optimization of Hologres performance problems

(I) Performance white paper

One of the most common questions users ask is how well Hologres performs, and we have a rough estimate of its performance.

With Hologres, the QPS for real-time writing to a single Core is 5000. For offline writing scenarios, such as Max Computer writing to Hologres, the QPS for a single Core is usually 5W. For OLAP queries, Core alone handles 2 million data volumes. For point-check scenarios, single Core QPS is around 1W.

Users can use this information to assess their own queries and how many resources are required for business scenarios.

(2) Real-time writing and dot checking

For different application scenarios, our optimization means are not quite the same.

For real-time write and point-check scenarios, the first step is to check if the table is built properly. For high QPS writes and click lookups, we want the Distribution Key to be consistent with the query criteria. Because the Distribution Key is used to find the corresponding Shard, in the case of high QPS, if the filter condition is the same as the Distribution Key, we can quickly route to a single Shard, so the query does not need to be sent to all shards, which is a big performance improvement in this scenario. So the Distribution Key is required to match the query criteria.

The second is that our table is better to be a row save table, because row save tables are very performance friendly for real-time writes and point-lookups.

The third scenario assumes that the table is stored in columns instead of rows. We want the PK, Clustering Key, and the query criteria to be the same, so that the ability of Clustering Index can be used.

In addition to table building optimization, you also need to optimize the query write code. Because if the code to write Hologres is not designed properly, there is a very high additional cost. Users may find that QPS seems to be getting stuck, but in fact the internal CPU utilization of Hologres is very low because the user’s own written code is not particularly efficient.

For such a problem, we first want the user to go through preparestMT as much as possible. The main benefit of preparestMT is that it can save the overhead of the entire execution plan. After a SQL is submitted, the SQL is compiled, parsed, an execution plan is generated and submitted to an execution engine to execute such a process. When the data is repeatedly executed in SQL, PreparestMT can be used to do no more generation, execution planning, parsing process, the cost is greatly reduced, query and write QPS will be higher.

The second point is that we want the user to write as little data as possible. For example, we often encounter a user who will write insert into values1 and insert into values2, then insert into values2 and insert into values3, Then you keep sending this little SQL for data inserts, which can incur very high data RPC costs, and the entire QPS can’t be uploaded.

We can make the write performance much better by pooling. For example, we can insert into values, a value contains 1000 values or 10000 values, and the 10000 values can be written in just one data transfer. There could be a 10,000-fold difference in performance compared to the previous approach.

The third piece is the overall use of the Holo Client, which can be used by users who may not know how to optimize their code, or who may not be able to put together a good batch.

Compared to the traditional JDBC Client, the Holo Client does all kinds of asynchronous encapsulation and batching logic for the user, and it does not have the overhead of the SQL engine and does not need to do some SQL parsing, so its write and query performance is much better than using JDBC.

Holo Client is also a built-in plugin for Blink Client writing, so it has better writing performance than the user’s own tools.

There is also a point when we connect as far as possible to use the VPC domain name for data writing and query.

Because direct use of the public network, the network between the RT is relatively high. If you use VPC network, because it is in the same website, the RT between the machines is relatively low, which can reduce the overhead of the whole network. In many application scenarios, its influence is very big, and it is also very important.

(2) Offline writing and query common problems

Let’s take a look at some of the common problems with offline writes and queries.

Why do offline writes and queries come together? This is because offline writing works the same way as querying. Offline writing is done by running a Query.

The first is the lack of statistical information.

Hologres itself is a distributed engine. It runs the SQL written by the user on the distributed engine, so it needs to go to the optimizer to generate an execution plan. The optimizer needs statistics to help it generate a good execution plan. One of the biggest and most common problems we encounter online is when user statistics are missing and the optimizer essentially loses input to generate an execution plan.

The second big problem is poor table creation. When the table is not consistent with the query, the performance of the whole query will be very poor.

In addition, Hologres is a self-developed engine, but in order to be compatible with Postgres’ open source ecology, there is a mechanism for federated queries that allow users to run queries in Postgres to run queries in Hologres, but with some additional performance penalty.

(3) Check the execution plan

The quality of the execution plan has a significant impact on query performance. The query optimizer of Hologres will select the query plan with the lowest execution time according to COST to execute the user’s query, but it is inevitable that some query plans are not optimal. Users can query the execution plan by the EXPLAIN command, which generally contains the following operators:

1. Data scanning operators: Seq Scan, Table Scan, Index Scan, etc.

Mainly used for data access, SEQ Scan, Table Scan and Index Scan correspond to sequential Scan, Table Scan and Index based Scan respectively.

2. Connector: HASH Join and Nested Loop.

A Hash Join is a joint between two tables in which one Table is converted into a Hash Table and the other Table is linked to the Lookup of the Hash Table.

A Hash Join is different from a Nested Loop by making the two tables appear to be two For loops, one For Loop that traversal all the data and the other.

3. Aggregation operators: Hash Aggregate and Streaming Aggregate.

AGG implementation based on Hash lookup and AGG implementation based on sorting.

4. Data movement operators: Redistribute Motion, Broadcast Motion, Gather Motion, etc. Hologres is a distributed engine, so data shuffles are inevitable. Redistribute Motion is mainly to Shuffle data, Broadcast Motion is to Broadcast data, and Gather Motion is to pull data together, these data movement operators are mainly used to solve the problem of distributed data.

5. Other operators: Hash, Sort, Limit, Append, etc.

(4) Lack of statistical information

One of the problems we often encounter when looking at Query performance is missing statistics. How do we know if statistics are missing?

The above is an example of an EXPLAIN query. When Hologres has no statistics, the default value for the number of rows is 1000. Here we can see that both the TMP and TMP1 tables currently have rows of 1000, indicating that neither table currently has statistics.

Problems prone to occur without statistical information:

Run the query OOM

Poor write performance

(5) Update statistical information

So how do we solve the problem of not having statistics?

Through the Analyze command, the user can update statistics.

Again as in the previous example, analyze TMP; And analyze tmp1, you can see that these two tables have statistics, as shown above.

As can be seen from the top, TMP1 has 10 million rows of data, and it joins 10 million rows of data. At this point, we find that although this table does a Hash Join, it joins in the wrong order. Because TMP has a very small amount of data before this, while temp1 has a very large amount of data, it will cause TMP1 to be placed on the side of the Hash Table, and then the Hash Table will be very large, resulting in poor performance of the whole Join.

After analyzing the Analyze, we can change the order of the Join to put the small table TMP on the Hash side and the large table TMP1 on the associated side, forming a better query plan and improving the query performance compared to the previous one.

(6) select the appropriate distribution column

We need to select the appropriate distribution column, which is the case with the LOCAL JOIN mentioned earlier in this article. If the user does not set the distribution column, Hologres needs to shuffle the data of the two tables together according to the Join Key during the associative query to ensure the correctness of the data. If the shuffle has a large amount of data, it can cause very high query latency.

How can we tell if we’re doing a Local Join?

As we can see in the above example, by looking at the execution plan through EXPLAIN, Redistribute Motion represents a Shuffle operator, and both the TMP and tmp1 tables are shuffled through the Join condition before the association, indicating that the entire association is not a Local Join.

The solution is to set the correlation Key to the Distribution Key.

The way to do this is to rebuild the table and set the association Key as A and B. At this time, we can look at the execution plan, and there is no Redistribute Motion, and the association is simply a Local Join.

In this way, you can change the entire association from a non-local Join to a Local Join, and there is a significant performance improvement.

(3) If you are interested in the topic, you should use the “Clustering Key”

If you want to use the “Clustering Key”, you may want to use the “table Key” or “table Key”.

As shown above, suppose we write a query:

explain select * from tmp where a > 1;

Assuming that field A is a Clustering Key, we should see a Cluster Filter in the Explain query’s execution plan, indicating that we have used the Clustering Key.

(8) Determine whether to use Bitmap

Next we decide whether to use Bitmap or not.

As shown in the figure above, our query criteria are:

explain select * from tmp where c = 1;

The c is the Bitmap Key, and you can see the Bitmap Filter :(c = 1) in the execution plan.

(9) Determine whether Segment Key is used

Next, determine whether the Segment Key is used.

As shown in the figure above, our query criteria are:

explain select * from tmp where b > 1;

Segment Filter :(b > 1) Segment Filter :(b > 1) Segment Filter :(b > 1)

If it does not, it may indicate that the table was created incorrectly, or that the query schema is not properly adapted to the table creation.

(10) federated query optimization

There are two sets of computing engines inside Hologres, one of which is the fully self-developed Holo computing engine. Its performance is excellent, but because it is fully self-developed, compared with the open-source Postgres computing engine, it cannot support all the functions of Postgres at the beginning, and some of the functions of Postgres will be missing.

The other set of computing engines is Postgres, which is completely open source and slightly less performing than the homegrown computing engine, but it is fully Postgres compatible.

Therefore, in Hologres, a query may use both the Holo and Postgres computing engines.

(11) Optimize federated queries

To determine whether federated queries are used, we can use EXPLAIN.

Hologres self-research engine does not support NOT IN. For queries:

Explain explain select * from TMP where a not in (select a from tmp1);

This is as follows:

External SQL (Postgres) : an operator that identifies the query engine running on the Postgres engine.

Since the Holo computing engine does not support NOT IN, this part of the computation is performed on Postgres. When seeing External SQL (Postgres) : this operator, the user needs to be aware that it is currently using functions not supported by the Holo computing engine. At this time, it is better to use Query rewrite to replace the operator supported by Holo to execute its Query, so as to improve the Query performance.

For the above scenario, we can change not in to not exist:

explain select * from tmp where not exists (select a from tmp1 where a = tmp.a);

When the table is not empty, you can change NOT IN to NOT EXIT. The Query is running on the HOLO engine. The External SQL (Postgres) operator is not visible.

The execution plan generated by this query may differ several times in query performance compared to the previous execution plan executed on the Postgres engine.

Through all the above examples, we understand the whole process of Hologres performance tuning and the key points that need to be paid attention to. Students who are interested in Hologres are welcome to pay more attention to and use Hologres.

This article is the original content of Aliyun, shall not be reproduced without permission.