This article is provided by the MaxCompute team at Alibaba. IT big guy said by authorization published, reproduced please indicate the source.

Read the word count: 5502 | 14 minutes to read

Abstract

At present, Hash Clustering Table has been officially released in Alibaba’s internal cluster production environment, and several BU, such as ants, Security Department and Cainiao, have participated in the trial. According to the feedback from the ants, the improved task has a significant effect, shortening the running time by 40% to 80% and saving computing resources by 23% to 67%.

In incremental update scenarios, you can use the new features of MaxCompute2.0 to simply modify statements, greatly improving performance and saving cluster resources.

background

In the process of data development, hierarchical design is often carried out. In the ODS layer, a very common scenario is to update a snapshot inventory table with a delta table. For example, the snapshot table stores all member information, while the delta table contains new member information and some changes to the attributes of the original member information. Or the snapshot table stores the latest month’s order information, the Delta table stores new orders, logistics updates, and so on.

For this kind of task, there are usually the following characteristics:

  1. The SNAPSHOT table has a large storage capacity, while the Delta table is relatively small

  2. The snapshot and delta tables have the same schema

  3. There are primary key keys in snapshot and delta tables, and the keys may overlap (otherwise this can be done with a simple union All)

  4. Snapshot of the previous period + delta of the current period => Snapshot of the current period

To do this, the corresponding SQL logic typically uses full outer JOIN. For simplicity, our snapshot and delta tables have only two columns

(key string, value string)Copy the code

Key is the primary key

INSERT OVERWRITE TABLE snapshot PARTITION (ds='20170102')
SELECT

CASE WHEN d.key IS NULL THEN s.key ELSE d.key END,
CASE WHEN d.key IS NULL THEN s.value ELSE d.value END

FROM
(SELECT * FROM snapshot WHERE ds='20170101') s
FULL OUTER JOIN
(SELECT * FROM delta WHERE ds = '20170102')d
ON s.key = d.key;Copy the code

This statement says that the values of the delta table are used for data that exists in the delta table and the values of snapshot are used for data that does not exist in the delta table.

The problem

However, in the actual implementation process, although it is only a simple join operation, such tasks often take a long time because the stock table may be very large (several tons to dozens of tons), and some tasks even need a day to produce. Is there room for optimization in this kind of task? We can analyze the execution plan of an actual large table full Outer JOIN on the line.

It can be seen that M1 is the snapshot table, requiring nearly 20000 concurrent entries, M2 is the delta table, with only 9 concurrent entries. In order to perform join operation, shuffle will be performed on both sides, and sort-merge-Join calculation will be performed in phase J3. Actual execution process, M2, it only takes a few minutes the M1 need ten minutes, while in the J3 phase often need one or two hours, because only 3000 concurrent J3, but read the upstream nearly 20000 concurrent read data, equivalent to 15% of the concurrent to the original, with the amount of data is the same, of course take long. In addition, shuffle from M1 to J3 contains one read/write sequence and two sort sequences for large tables. In addition, when a large amount of data is generated, data spill may occur, which deteriorates the performance.

In this case, to shorten the execution time, you can increase the number of instances in the Join phase, increase the memory in the join phase, and reduce spill. However, the number of instances cannot be increased indefinitely. Otherwise, the shuffle scale is too large, resulting in excessive cluster pressure. Therefore, adjusting parameters is only to sacrifice resources for time, treating the symptoms rather than the root cause.

To completely optimize this scenario, we want to eliminate the shuffle phase of the large table completely, and merge M1 and J3 into one. In this way, the data of the large table only needs to be read and written once, and the intermediate sorting process is eliminated, and the execution time can be reduced by half or more. In addition, mapJoin does not support full outer Join. In addition, mapJoin does not support full outer Join. Both of these limitations cannot be avoided, so this solution must be passed. So how do you eliminate the shuffle phase? Here comes the hash Clustering Table feature we introduced today.

plan

Hash clustering, in short, is the shuffling and sorting of data in advance, and the direct participation in the computation after reading the data in the process of using it. This pattern is ideal for scenarios where subsequent nodes join or aggregate with the same key multiple times after output. Of course, generating the Hash table comes at a cost of its own, with an extra shuffle during the generation phase. Therefore, this function does not work for all scenarios. For example, if data is generated only once, it makes no difference whether shuffle is performed during table generation or after table reading. But for certain scenarios, this feature can make a significant difference.

Based on this scenario, let’s rebuild the SNAPSHOT table

ALTER TABLE snapshot CLUSTERED BY (key) SORTED BY (key) INTO 100 BUCKETS;Copy the code

Note that this 100 bucket can be set depending on the actual data size. This is just an example

Then rebuild ds=’20170101′

INSERT OVERWRITE TABLE snapshot PARTITION (ds='20170101')
SELECT key, value
FROM snapshot
WHERE ds='20170101'Copy the code

Note that this process takes longer than normal insert Overwrite due to an additional shuffle phase.

  • First attempt: full outer join

When the data is ready, execute the full OUTER JOIN statement again

INSERT OVERWRITE TABLE snapshot PARTITION (ds='20170102')
SELECT

CASE WHEN d.key IS NULL THEN s.key ELSE d.key END,
CASE WHEN d.key IS NULL THEN s.value ELSE d.value END

FROM
(SELECT * FROM snapshot WHERE ds='20170101') s
FULL OUTER JOIN
(SELECT * FROM delta WHERE ds = '20170102')d
ON s.key = d.key;Copy the code

Let’s take a look at the execution plan

SQL > alter table merge merge merge merge merge merge merge merge merge merge merge merge

This partition is also a hash table partition, which requires shuffling of data according to a specified key during the writing process. However, in the subsequent full outer join process, null may be added, which cannot guarantee that the output data still conforms to shuffle characteristics. Therefore, a reshuffle is required.

SQL > select NULL from CASE WHEN d.id IS NULL THEN S.id ELSE D.id END; Therefore, in this case, large table data will still have a shuffle, which is not satisfactory to us.

  • Second attempt: not in + union all

Now the next question is how do we get the optimizer to recognize that we’re not changing the shuffle property, that we’re observing the full outer join and that this SQL is actually a union process

The entire SQL can then be split into two parts

SELECT a.key, a.value
FROM (SELECT * FROM snapshot WHERE ds='20170101' AND KEY NOT IN

(SELECT  key FROM delta WHERE ds='20170102')) a  -- snapshot_not_in_delta

UNION ALL
SELECT key, value FROM delta WHERE ds='20170102' -- delta_allCopy the code

In the above two parts, the first part corresponds to the blue part in the diagram, and the second part corresponds to the green part in the diagram. We only filter the snapshot key column and do not change the key distribution, so this statement saves an extra shuffle. However, MaxCompute has a constraint on not in that the result set cannot exceed 2000, which limits the use of this notation.

  • Final solution: Anti semi Join + Union All

The new anti-Semi Join supported by MaxCompute2.0 also implements the semantics of not in, and there is no limit to the size of the result set

INSERT OVERWRITE TABLE snapshot PARTITION (ds='20170102')
SELECT s.key, s.value 
FROM (SELECT * FROM snapshot WHERE ds='20170101') s 
LEFT ANTI JOIN
(SELECT * FROM delta WHERE ds='20170102') d ON s.key = d.key
UNION ALL
SELECT key, value
FROM delta
WHERE ds='20170102';Copy the code

After this step, let’s run it and see what happens.

M1 reads the delta table, M2 reads the snapshot table, sort-merge-join the snapshot table, and then writes out the data. The last R3 stage is only an information collection task, which takes seconds, so the actual processing stage is only two stages. M1 combines the functions of M1 and J3, and the actual running time can be halved due to the omission of a data read and write operation, sorting operation, and possible spill operation.

As mentioned above, the hash table is of limited use if the data is read or written once, but in the case of incremental updates, both the input and output are the hash clustering data, without modifying the cluster key. It is only filtered so that we can complete the four operations read-> Join -> Union all->write in just one stage, greatly reducing the running time.

results

At present, Hash Clustering Table has been officially released in Alibaba’s internal cluster production environment, and several BU, such as ants, Security Department and Cainiao, have participated in the trial.

According to the feedback from the ants, the improved task has a significant effect, shortening the running time by 40% to 80% and saving computing resources by 23% to 67%.

After using hash clustering, the task execution plan of the novice can be changed, which saves shuffle and other operations required by the previous join operation. The task execution time can be reduced from about 40 minutes to less than 20 minutes, which effectively improves the task execution efficiency, shortens the execution time and saves resources.

After applying Hash Clustering to flying pigs, the whole calculation process can be shortened from 3 hours before optimization to 40 minutes. For detailed fact table view, one reading and calculation can be completed within 1 minute. For storage, the amount of storage saved is linear with the amount of data bloat. In view form, we saved 80% of storage with a very small computation cost, which seems to be well worth it.

Therefore, the price we pay is only to modify the attributes of the table and perform a data generation operation in advance, which only needs to be performed once, once and for all.

Finally, you are welcome to use hash clustering for your incremental update tasks. Based on our experience, the more data you have in a large table, the more you will benefit from it.

Some caveats

  1. Setting the number of buckets requires some experience. The more buckets, the more concurrency, the faster the operation. However, if the file itself is not large, the more small files, the recommended setting a bucket 500MB to 1GB. In any case, it is not recommended to set a bucket number greater than 4096.

  2. Hash clustering table rearrangement of data can, in extreme cases, result in a lower compression rate for files that used to be highly compressed, affecting subsequent performance. This can be confirmed by looking at the input/output bytes of the summary that generated the table

  3. We are currently reconstructing decimal types, which may affect the distribution of Decimal types, so we do not use Decimal types for the clustered Key

  4. The schema of the snapshot and delta tables need not be identical. However, if the key type is different, such as bigint on one side and string on the other, the join type of the delta table needs to be converted to the key type of snapshot. Otherwise you still need a reshuffle.

That’s all for today’s sharing. Thank you!

Editor: IT big guy said, reprint please indicate copyright and source