Brief content:

I. Background of data warehouse architecture upgrade

2. Integrated architecture practice of Lake and Warehouse based on Iceberg

Iii. Summary and benefits

Iv. Follow-up planning

Making the address https://github.com/apache/flink welcome to Flink thumb up to star ~

I. Background of data warehouse architecture upgrade

1. Pain points of Hive based data warehouse

The original data warehouse is built entirely based on Hive and has three major pain points:

Pain point 1: ACID is not supported

1) Upsert scenarios are not supported;

2) Row-level DELETE is not supported, and data correction costs are high.

Pain point 2: timeliness is difficult to improve

1) It is difficult to achieve quasi-real-time visibility of data;

2) It cannot be read incrementally and cannot realize the unification of stream batch at the storage level;

3) Data analysis scenarios with minute-level delay are not supported.

Table Evolution

1) Write Schema, poor support for Schema change;

2) Partition Spec change support is unfriendly.

2. Iceberg key features

Iceberg has four key features: ACID semantics support, incremental snapshot mechanism, open table and streaming batch interface support.

  • Support ACID semantics

    • Incomplete Commit will not be read;

    • Support concurrent Commit based on optimistic lock;

    • Row-level delete, which supports Upsert.

  • Incremental snapshot Mechanism

    • Data is visible after Commit (minute level);

    • A historical snapshot that can be traced back.

  • Open table format

    • Data format: Parquet, ORC, AVro

    • Computing engines: Spark, Flink, Hive, Trino/Presto

  • Stream batch interface support

    • Support stream and batch write;

    • Supports stream and batch reading.

2. Integrated architecture practice of Lake and Warehouse based on Iceberg

The significance of the integration of lake and warehouse means that I do not need to see the lake and warehouse. The data has the format of open metadata, which can flow freely and connect with the upper diversified computing ecology.

— Jia Yangqing, a senior researcher at Alibaba Cloud Computing Platform

1. Append The link that flows into the lake

The figure above shows the link where log data enters the lake. Log data includes client logs, client logs, and server logs. The log data is recorded to Kafka in real time, then written to Iceberg via the Flink task, and finally stored in HDFS.

2. The Flink SQL link to the lake is established

Our Flink SQL link into the lake is completed based on “Flink 1.11 + Iceberg 0.11”. We mainly do the following for connecting Iceberg Catalog:

1) Meta Server adds support for Iceberg Catalog;

2) Add Iceberg Catalog support to SQL SDK.

On this basis, the platform opens the Iceberg table management function, so that users can build SQL tables on the platform themselves.

3. Access to the lake – Proxy users are supported

The second step is internal practice, docking the existing budget system, authority system.

Because when the platform does real-time work before, the platform is run by default for Flink users, and HDFS storage is not involved in the storage before, so there may be no problem, and there is no problem in budget division.

But writing Iceberg now may involve some problems. For example, if the warehouse team has its own market, the data should be written to their directory, the budget should be allocated to their budget, and the system of permissions and offline team accounts should be opened.

As shown above, this section mainly acts as an agent user function on the platform. Users can specify which account to use to write such data to Iceberg. The implementation process is mainly as follows:

  • Add Table level configuration: ‘iceberg.user.proxy’ = ‘targetUser ‘

    1) Enable Superuser

    2) Team account authentication

  • Enabling proxy users when accessing HDFS:

  • Specify a proxy user when accessing Hive Metastore

    1) Refer to Spark’s implementation:

    org.apache.spark.deploy.security.HiveDelegationTokenProvider

    2) Dynamic proxy HiveMetaStoreClient, using proxy users to access HiveMetaStore

4. Flink SQL into the lake example

DDL + DML

5. CDC data into lake link

As shown above, we have an AutoDTS platform that is responsible for real-time access of business library data. We connect these business libraries to Kafka, and it also supports the configuration of distribution tasks on the platform, which is equivalent to distributing data into Kafka to different storage engines, in this case to Iceberg.

6. Flink SQL CDC link to the lake is established

Here are the changes we made to support CDC access to the lake for “Flink1.11 + Iceberg 0.11” :

  • Improved Iceberg Sink:

    Flink version 1.11 is AppendStreamTableSink, which cannot handle CDC flows. Modify and adapt.

  • Table management

    1) Support Primary Key (PR1978)

    2) Enable V2 version: ‘iceberg. Format. Version ‘= ‘2’

7. CDC data into the lake

1. Support the Bucket

How to ensure that the same data is written to the same Bucket in the Upsert scenario?

The Flink SQL syntax does not support the declaration of bucket partitions.

‘partition. Bucket. source’=’id’, // Specify the bucket field

‘partition. Bucket. num’=’10’, // Specify bucket number

2. Copy-on-write sink

The reason for doing copy-on-write is that the merge-on-read of the original community does not support merging small files, so we temporarily implemented copy-on-write sink. At present, the business has been testing the use, the effect is good.

Merge-on-read is a StreamWriter with multiple parallelism, and FileCommitter with single parallelism commits sequentially.

In copy-on-write, you need to set the number of buckets based on the data volume of the table. You do not need to merge small files.

  • StreamWriter writes multiple parallelism in the snapshotState phase

    1) Increase Buffer;

    2) Before writing data, check that the last checkpoint is committed successfully.

    3) Group and merge buckets and write data one by one.

  • FileCommitter Single-parallelism sequential submission

    1) the table. NewOverwrite ()

    2) Flink.last.com mitted. Checkpoint. Id

8. Example – CDC data is configured to the lake

As shown in the figure above, a business can create or configure a distribution task on the DTS platform.

Select Iceberg table for instance type, then select target library to indicate which table data to synchronize to Iceberg, and then select the mapping relationship between the fields of the original table and target table. After configuration, the distribution task can be started. After startup, it will submit a real-time task in the real-time computing platform Flink, and then use copy-on-write sink to write data to the Iceberg table in real time.

9. Other practices in entering the lake

Practice 1: Reduce Empty Commits

  • Problem Description:

    When upstream Kafka has no data for a long time, new Snapshots are still generated at each Checkpoint, resulting in a large number of empty files and unnecessary snapshots.

  • Solution (PR-2042) :

    Max-continuousempty-commits Added configuration flink. max-continuousEmpty-commits that trigger a Commit and generate a Snapshot after a specified number of Checkpoint times are unchanged.

Practice 2: Track down the watermark

  • Problem Description:

    Currently Iceberg table itself cannot directly reflect the progress of data writing, and offline scheduling is difficult to trigger downstream tasks accurately.

  • Solution (PR-2109) :

    In the Commit stage, the Watermark of Flink is recorded to the Properties of the Iceberg table, which can intuitively reflect the end-to-end delay, and can also be used to judge the data integrity of partition, so as to schedule and trigger downstream tasks.

Practice 3: Delete table optimization

  • Problem Description:

    Removing Iceberg can be slow, causing the platform interface to time out accordingly. Because Iceberg is an object-oriented store that abstracts the IO layer, there is no quick way to clear directories.

  • Solution:

    Extend FileIO to add deleteDir method to quickly delete table data in HDFS.

10. Small file merging and data cleaning

Periodically perform a batch task (Spark 3) for each table, which is divided into the following three steps:

1. Periodically merge small files of newly added partitions:

rewriteDataFilesAction.execute(); Only small files are merged, old files are not deleted.

2. Delete expired snapshot and clear metadata and data files:

​ table.expireSnapshots().expireOld erThan(timestamp).commit();

3. Clean up orphan files. By default, clean up files that cannot be touched 3 days ago.

​ removeOrphanFilesAction.older Than(timestamp).execute();

11. Computing engine — Flink

Flink is the core computing engine of the real-time platform. Currently, it mainly supports the scene of data entering into the lake, which has the following characteristics.

  • Quasi-real-time data into the lake:

    Flink and Iceberg have the highest degree of integration of data into the lake, and Flink community actively embraces data lake technology.

  • Platform integration:

    AutoStream introduces IcebergCatalog and supports SQL table creation and lake entry. AutoDTS supports MySQL, SQLServer, and TiDB tables to be added to the lake.

  • Stream batch one:

    Under the concept of flow and batch integration, Flink’s advantages will gradually be reflected.

12. Computing engine – Hive

Hive is more integrated than Iceberg and Spark 3 in SQL batch processing and provides the following functions.

  • Regular small file merge and META information query:

    SELECT * FROM PROd.db.table. History snapshots, files, manifests

  • Offline data writing:

    1) Insert into 2) Insert overwrite 3) Merge into

  • Analysis query:

    It mainly supports daily quasi-real-time analysis and query scenarios.

13. Computing Engine — Trino/Presto

AutoBI has been integrated with Presto for reporting and analytical query scenarios.

  • Trino

    1) Iceberg is directly used as report data source

    2) Need to add metadata caching mechanism: github.com/trinodb/tri…

  • Presto

    Community integration: github.com/prestodb/pr…

14. A trampled hole

1. An exception occurs when accessing Hive Metastore

** Fault description: ** The HiveConf constructor is incorrectly used. As a result, the configurations declared in the Hive client are overwritten, causing an exception when accessing The Hive MetaStore

** Solution (PR-2075) : ** Fix the HiveConf construct to show that the addResource method is called to ensure that the configuration is not overwritten: hiveconf. addResource(conf);

2. The metaStore lock is not released

Problem description: “CommitFailedException: Timed out after 181138 ms waiting for lock XXX. “Cause hiveMetastoreClient. Lock: Unlock is displayed even when the lock is not obtained.

** solution (pr-2263) : ** optimize the HiveTableOperations#acquireLock method to display a call to unlock to release the lock in case it fails to acquire it.

3. The metadata file is lost

NotFoundException Failed to open input stream for file: xxx.metadata.json NotFoundException Failed to open input stream for file: xxx.metadata.json NotFoundException Failed to open input stream for file: xxx.metadata.json

** Solution (PR-2328) : ** When the metaStore of Hive update iceberg table metadatA_location times out, add a check mechanism to confirm that metadata is not saved successfully before deleting metadata files.

Iii. Benefits and summary

1. Summary

Through exploring the integration of lake warehouse and flow batch, we have made a summary respectively.

  • One lake storehouse

    1) Iceberg supports Hive Metastore;

    2) Use the same data format and computing engine as Hive tables.

  • Flow of fusion

    In the quasi-real-time scenario, stream batch unification includes the same source, same computation, and same storage.

2. Business benefits

  • Data timeliness improvement:

    Warehousing delay reduced from more than 2 hours to less than 10 minutes; The algorithm core task SLA was completed 2 hours in advance.

  • Quasi-real-time analysis query:

    Combined with Spark 3 and Trino, it supports quasi-real-time multidimensional analysis queries.

  • Feature engineering efficiency improvement:

    Quasi-real-time sample data are provided to improve the timeliness of model training.

  • Quasi-real-time warehousing of CDC data:

    Can do quasi – real – time analysis query for business table in data warehouse.

3. Architectural benefits – Quasi real-time data positions

As mentioned above, we support quasi-real-time warehousing and analysis, which is equivalent to providing basic framework verification for subsequent quasi-real-time warehouse construction. The advantage of quasi – real-time data warehouse is a development, unified caliber, unified storage, is the real batch flow in one. The disadvantage is the poor real-time performance, which may be second – and millisecond – level delay, but now is minute – level data visibility.

However, in terms of architecture, this is of great significance. In the future, we can see some hope that the whole “T + 1” data warehouse can be made into a quasi-real-time data warehouse to improve the overall data timeliness of the data warehouse and better support the upstream and downstream businesses.

Iv. Follow-up planning

1. Follow the Iceberg version

Fully open V2 format, support CDC data MOR into the lake.

2. Construction of quasi-real-time data warehouse

Based on Flink and Data pipeline mode, the speed of each layer of the warehouse is comprehensively increased.

3. Stream batch integration

With the gradual improvement of upSERt functions, continuous exploration of storage layer streaming batch integration.

4. Multidimensional analysis

Output quasi-real-time multidimensional analysis based on Presto/Spark3.