MaxCompute table design best practices

An operation that produces a large number of small files

MaxCompute table small files affect storage and performance, so let’s take a look at the operations that produce a large number of small files and avoid them when designing the table.

  • The MaxCompute Tunnel SDK is used to upload data. During the upload, a file is generated every commit time. If each file is too small (e.g., a few kilobytes) and uploaded frequently (e.g., 5 seconds), 720 small files are generated in an hour, and 17,280 small files are generated in a day.
  • The MaxCompute Tunnel SDK is used to upload data, create session, create commit, and generate a large number of empty directories (equivalent to small files on the server side).
  • During the upload using the MaxCompute Console command line tool Tunnel command, the local large files are partitioned into too small files. As a result, too many files are generated and the files are too small.
  • Data archived by DataHub. Two conditions must be met for each shard written to MaxCompute by DataHub: the total amount of data reaches 64MB, and the MaxCompute is committed once to form a file. Or commit every 5 minutes to form a file. If the number of shards opened is too large (say 20 shards), the data of each shard is far less than 64MB in 5 minutes. If the data is hundreds of KB, a large number of small files will be generated. That’s 241220=5760 small files per day.
  • Insert into a MaxCompute table (or partition) using a data development tool such as Dataworks. Each insert into a MaxCompute table (or partition) generates a file. Accumulate 10000 insert insert records every day, 1000 small files will be generated.
  • DTS synchronizes data from RDS and other databases to MaxCompute. When DTS synchronizes data, it creates full table and incremental table. During data insertion, the incremental table process commits a complete data synchronization because the number of data inserts is small, resulting in small file problems in the incremental table. For example, if the data volume is 10 at a time and the increment is 10,000 in a day, 1000 small files will be generated. In this scenario, merge the full limit table and incremental data table after data synchronization.
  • If there are too many source data collection clients, the source data directly enters a partition through the unnel. When each source data collection client submits data once, an independent file will be generated in the same partition, resulting in a large number of small files.
  • SLS triggers FunctionCompute to continuously access files into the MaxCompute center at high frequency, and streams small file data into MaxCompute.

Divide project space based on data

A Project is the highest level object of MaxCompute. Resources are allocated, isolated, and managed according to the Project space, realizing the multi-tenant management capability.

  • If multiple applications need to share “data”, it is recommended to use the same project space.
  • If the “data” required by multiple applications is unrelated, it is recommended to use different project Spaces. Tables and partitions between project Spaces can be swapped through Package authorization.

Best practices for dimension table design:

Typically, a table describing attributes is designed as a dimension table. A dimension table can be associated with any table in any table group and does not require partition information when creating a dimension table. However, the size of data in a single table is limited. Note the following points when designing and using dimension tables:

  • Generally, the number of dimension tables cannot exceed 10 million.
  • Dimension table data should not be heavily updated.
  • Mapjoin can be used when dimension tables and other tables are joined.

Zipper table design – Extreme storage applications

Extreme storage features to be released, mainly provides design ideas here. In the process of data model design of data warehouse, we often meet such requirements:

  • Large amount of data. Some fields in the table will be updated, such as the user’s address, product description, order status, mobile phone number, and so on.
  • You need to view historical snapshot information at a point in time or within a period of time. (For example, check the status of an order at a certain point in history, for example, check how many times a user has updated it in a certain period of time, etc.)
  • The proportion and frequency of changes are not very large. For example, there are a total of 10 million members, and about 100,000 members are added and changed every day. If the table keeps a full copy every day, a lot of unchanged information will be saved in the full copy every time, which is a great waste of storage. Consider the use of limit storage: MaxCompute provides the ability to convert different tables into limit storage tables. The following is an example of extreme storage operations:
  1. Create the source table.
create table src_tbl (key0 STRING, key1 STRING, col0 STRING, col1 STRING, col2 STRING) PARTITIO N (datestam p_x STRING, pt0 STRING);Copy the code
  1. Import data.
  2. Convert SRC_TBL to a table of limit storage.
set odps.exstore.primarykey=key0,key1;
[setodps.exstore.ignorekey=col0; ]  EXSTO RE exstore_tbl PARTITIO N (datestam p_x='20140801'); EXSTO RE exstore_tbl PARTITIO N (datestam p_x='20140802');Copy the code

Zipper table design more articles detailing can refer to the cloud: yq.aliyun.com/articles/54…

Design of acquisition source table

Data collection methods: streaming data writing, batch data writing, periodic scheduling strip data insertion. In the case of large data volume, ensure that the data of the same service unit is divided into partitions and tables. In the case of small amount of data, the acquisition frequency is optimized.

  • Write streaming data.

    • For streaming written data, there are generally many channels for collection, so relevant acquisition channels should be effectively distinguished. In the case of large amount of writing in a single data channel, partition design should be carried out according to time.
    • When the amount of data in the acquisition channel is small, the design of non-partitioned table can be adopted, and the terminal type and acquisition time can be designed as standard column segments.
    • When using Datahub for data writing, the number of shards should be reasonably planned to avoid the problem of low traffic and many channels in the acquisition channel caused by too many Shards.
  • Write data in batches. Batch data write Focus On write period Periodic scheduling Stripe data insertion.
  • Avoid periodic data insertion. In this case, you need to create a partition table and insert data into a new partition to reduce impact on the original partition.

Design of log tables

Log is actually a flow table, does not involve the update of records, to a collection of a, a number of storage together, the main attention to the design of log table several points:

create table src_tbl (key0 STRING, key1 STRING, col0 STRING, col1 STRING, col2 STRING) PARTITIO N (datestam p_x STRING, pt0 STRING);
  set odps.exstore.primarykey=key0,key1;
[setodps.exstore.ignorekey=col0; ]  EXSTO RE exstore_tbl PARTITIO N (datestam p_x='20140801'); EXSTO RE exstore_tbl PARTITIO N (datestam p_x='20140802');Copy the code
  • Consider whether the log needs to be de-processed.
  • Consider whether you need to extend dimension attributes.

    • Whether it is necessary to extend the dimension attribute field of the associated dimension table considers two points: the frequency of service use and the output delay caused by association.
    • You need to choose carefully whether to extend the dimension table
  • Consider differentiating terminal types.

    • Due to the large number of log tables, statistical analysis is usually performed on the PC and APP terminals during service analysis. Meanwhile, there are two collection systems on the PC and APP terminals. Therefore, it is common practice to design multiple detailed DWD tables according to terminals.
    • If there are a large number of terminals with a small amount of data, for example, the data of a terminal is less than 1 TB but is collected for a large number of times, you can set the terminal information to a common column instead of partitioning the terminals.

Note:

  • You can partition the log table by day based on the log collection time. Data is collected and integrated before data is imported. A batch of data is written and submitted once (usually 64 MB).
  • Log data is rarely updated to the original partition. Insert can be used to insert a small amount of data, but it is generally necessary to limit the number of inserts.
  • If you have a large number of update operations, use the Insert Overwrite operation to avoid small file problems.
  • Configure proper partitions for log tables and archive hot and cold data that has not been accessed for a long time.

Design of interactive list

Periodic snapshot table, which stores snapshots of all records in the collection every day. Problem: There are a lot of accumulated historical records. To generate snapshots every day, merge the current incremental table with the full table of the previous day, which consumes resources. To count the number of newly added favorites in the last day, we need to scan the whole table. How to reduce resources? Suggested solution: establish a transactional fact table, and set up a periodic snapshot table to store the current valid collection, to meet the statistical analysis needs of various businesses. Note:

  • The most important thing in designing an interaction list is to distinguish the relationship between inventory data and incremental data. – Data on a new partition can be written as incremental data.
  • Changes and inserts to old partitioned data should be minimized.
  • Insert overwrite and insert into should be selected as far as possible.

MaxCompute Updates and deletes table data

Delete /update/ Merge SQL supported by relational databases is implemented in MaxCompute as follows: Table preparation

Table1 (KEY1 String, KEY2 String, COL1 String, COL2 string); Table2 (key1 String,key2 String, COL1 string,col2 string); Table3 (key1 String,key2 string,col1 string,col2 string);Copy the code

Update (table1, table2, table2, table1)

insert overwrite table table1 select t1.key1
,t1.key2
,case when t2.key1 is not null then t2.col1 else t1.col1 end as col1 ,case when t2.key1 is not null then t2.col2 else t1.col2 end as col2
from table1 t1
left outer join table2 t2 on t1.key1=t2.key1 and t1.key2 = t2.key2 ;Copy the code

Delete (delete from table2, delete from table1)

insert overwrite table table1 select t1.key1
,t1.key2 ,t1.col1 ,t1.col2
from table1 t1
left outer join table2 t2 on t1.key1=t2.key1 and t1.key2 = t2.key2 where t2.key1 is null
;Copy the code

Merge (no del)

Insert overwrite table table1 select * from table1; Select t1.key1,t1.key2,t1.col1,t1.col2 from table1 t1 left outer join table2 t2 on t1.key1=t2.key1 and select * from table1 t1 left outer join table2 t2 on t1.key1=t2.key1 and select * from table1 t1 left outer join table2 t2 on t1.key1=t2.key1 and  t1.key2 = t2.key2whereT2. key1 is null union all Select t2.key1,t2.key2,t2.col1,t2.col2 from table2 t2)tt;Copy the code

Merge (del) insert overwrite table table1 select * from table1; merge(del) insert overwrite table table1 select * from table1; All that remains is no new record today

Insert overwrite table table1 select * from table1; insert overwrite table table1 select * from table1; Select t1.key1,t1.key2,t1.col1,t1.col2 from table1 t1 left outer join table2 t2 on t1.key1=t2.key1 and t1.key2 = t2.key2 left outer join table3 t3 on t1.key1=t3.key1 and t1.key2 = t3.key2whereT2. key1 is null or t2.key1 is null union all Select t2.key1,t2.key2,t2.col1,t2.col2 from table2 t2)tt;Copy the code

Table creation design example

Scenario: Collect weather information.

  • Basic information: Data information includes place names, attributes such as area, basic population, and weather information.
  • The change of attribute data is small, and the number of weather information is collected by multiple terminals, and the amount of data is large
  • The weather information changes greatly, and the traffic is basically stable when the number of terminals is stable. Table Design Guidelines:
  • It is recommended that the data information be divided into the basic attribute table and the weather log table to distinguish the data with small change and large change.
  • Due to the huge amount of data, the weather log table can be partitioned according to region, or it can be partitioned according to time such as day. This partition method avoids other irrelevant data changes caused by the weather changes in a certain place or a certain time.
  • Datahub is used on the collection terminal for data aggregation, and appropriate number of SHard channels is selected according to stable flow value. Data is written into the weather log table in batch mode without Insert into.

MaxCompute Specifies the special function of the table

The life cycle

MaxCompute table/partition provides data life cycle management. If the table has not changed since the last update time, the table is automatically reclaimed by MaxCompute. This specified time is the life cycle, and the life cycle is set at the table level.

create table test_lifecycle(key string) lifecycle 100; /alter table test_l ifecycleset lifecycle 50;Copy the code

MaxCompute will determine whether to reclaim a non-partitioned table or partitions in a partitioned table based on the LastDataModifiedTime and lifecycle Settings of each non-partitioned table or partition. MaxCompute SQL provides a touch operation to change the LastDataModifiedTime of a partition. The LastDataModifiedTime of the partition is changed to the current time. If you change the value of LastDataModifiedTime, MaxCompute assumes that the table or partition has changed, and the life cycle is restarted.

ALTER TABLE table_nam e TO UCH PARTITIO N(partition_col='partition_col_valu e',...). ;Copy the code

Note:

  • Planning the life cycle of a table properly and setting the life cycle when creating a table can effectively reduce storage pressure.
  • Any changes to the table data affect the judgment time of the lifecycle to reclaim data, including small file merges.

Avoid full table scan

Table design.

  • Create a partitioned table or column design for scan conditions.
  • Partition data tables properly. Set common query criteria to column names.
  • Read common query conditions to calculate hash clustering data:
  • Add partition filtering conditions, or reduce the number of scanned partitions, or remove the middle small table and then scan the history partition of the small table to reduce the amount of data scanned.
  • The intermediate result of global scan table is stored to form intermediate table.
  • If you scan partitions for a year every day, the calculation consumption is very high. You are advised to remove an intermediate table, summarize the table once a day, and then scan partitions for a year in the middle table. The amount of scanned data will be greatly reduced.

Avoid small files

  • Small files generated during Reduce calculation: Insert overwrite source tables (or partitions) or write to new tables to delete source tables.
  • Small files generated during Tunnel data collection Suggestions:

    • When tunnelSDK is called, the buffer reaches 64M.
    • Do not upload small files frequently when you use console. You are advised to upload small files at one time when a large number of files are accumulated. If a partition table is imported, you are advised to set the life cycle for the partition. Expired data is automatically deleted.
    • As in the first scenario, insertoverWrite the source table (or partition);
    • ALTER Merge mode. Merge using the console command.
  • When using temporary tables, you are advised to add a life cycle to the table, and the garbage collection will collect the table automatically when it expires. – Applying for too many DataHub shards will cause small file problems, apply for the number of DataHub shards policy:

    • The default throughput for a single shard is 1MB/s, and the actual number of shards can be allocated based on this (you can add a few more);
    • The logic of odps synchronization is that each shard has a separate task(5 minutes or 64MB will be committed once). The default setting is 5 minutes to find data in odps as soon as possible. If the partition is created by hour, the shard has 12 files per hour.
    • If the amount of data is small but the number of shards is large, there will be many small files in ODPS (shard*12/hour).
    • Don’t overdistribute shards, distribute according to need.

Convert the Hash Clustering table

Hash table advantages: Optimized Bucket Pruning/ optimized Aggregation/ optimized storage. Table staging BY specifies the Hash Key. MaxCompute hashes the specified columns and splits them into buckets BY Hash value. Hash Key refers to the selection principle:

  • Select columns with fewer duplicate key values
  • SORTED BY is used to specify how fields are SORTED within a Bucket. How to convert to HashClustering table:
ALTER TABLE table_nam e [CLUSTERED BY (col_nam e [, col_nam e, ...]) [SO RTED B Y (col_nam e [ASC | DESC] [, col_nam e [ASC | DESC] ...])] INTO num ber_of_buck ets BUCKETS]Copy the code

The ALTER TABLE statement applies to the stock TABLE, and after adding the new aggregate attribute, the new partition will do the Hash cluster storage. After creating the HashClustering table, use insert Overwrite to convert it from another source table. Note that the Hash table has the following restrictions:

  • Insert into is not supported and data can only be added by insert overwrite.
  • Tunnel upload to range Cluster table is not supported, because tunnel upload data is out of order.