Introduction: Di Xingxing, the person in charge of Autohome real-time computing platform, shared on Meetup of Shanghai website on April 17, the integrated architecture practice of lake and warehouse based on Flink + Iceberg.

Brief content:

First, the background of data warehouse architecture upgrade

II. Practice of integrated architecture of lake and warehouse based on Iceberg

Third, summary and income

IV. Follow-up planning

Making the address https://github.com/apache/flink welcome to Flink thumb up to star ~

First, the background of data warehouse architecture upgrade

1. The pain points of Hive based data warehouse

The original data warehouse was built entirely based on Hive, and there were three major pain points:

Pain point 1: ACID is not supported

1) Upsert scenario is not supported;

2) ROW-LEVEL DELETE is not supported and the cost of data correction is high.

Pain point 2: Timeliness is difficult to improve

1) Data is difficult to be visible in quasi-real-time;

2) Unable to incrementally read, unable to achieve the unified stream batch at the storage level;

3) Data analysis scenarios with minute-level latency cannot be supported.

Pain point 3: Table Evolution

1) Write-type Schema with poor support for Schema changes;

2) Partition Spec change support is not friendly.

2. Key features of Iceberg

Iceberg has four key features: support for ACID semantics, incremental snapshot mechanism, open table format, and stream batch interface support.

  • Support ACID semantics

    • Do not read an incomplete Commit;
    • Concurrent Commit support based on optimistic locking
    • ROE-LEVEL DELETE, which supports UPSERT.
  • Incremental snapshot mechanism

    • Data is visible after COMMIT (minute level);
    • Recall historical snapshots.
  • Open table format

    • Data formats: PARQUET, ORC, AVRO
    • Computing engines: Spark, Flink, Hive, Trino/Presto
  • Stream batch interface support

    • Support stream, batch write;
    • Support stream and batch reading.

II. Practice of integrated architecture of lake and warehouse based on Iceberg

The meaning of the integration of the lake and warehouse means that I do not need to see the lake and warehouse, and the data has a metadata format through which it can flow freely and connect with the diverse computing ecology of the upper layer.

— JIA Yangqing, Senior Researcher of Alibaba Cloud Computing Platform

1. The link of APPEND into the lake

The figure above shows the link of log class data into the lake. The log class data includes client log, client log and server log. These log data will be logged into Kafka in real time, then written to Iceberg via the Flink task, and finally stored in HDFS.

2. Flink SQL inbound link is open

Our Flink SQL access to the lake link is completed based on “Flink 1.11 + Iceberg 0.11”, and we mainly make the following contents for docking Iceberg Catalog:

1) Meta Server adds support for Iceberg Catalog;

2) SQL SDK adds Iceberg Catalog support.

Then on this basis, the platform opens the management function of Iceberg table, so that users can build SQL tables on the platform.

3. Into the lake – support proxy users

The second step is the internal practice, docking the existing budget system, authority system.

Because the platform was run by default for Flink users when the platform was doing real-time jobs, the previous storage did not involve HDFS storage, so there was probably no problem, and there was no question about budget division.

But now when you write about Iceberg, it’s a little bit of a problem. For example, if the data warehouse team has its own bazaar, the data should be written under their catalog, the budget should also be allocated under their budget, and the permissions should be connected with the offline team account system.

As shown above, this is mainly the function of agent user on the platform. Users can specify which account to use to write this data to the Iceberg. The realization process mainly includes the following three steps.

  • Added Table level configuration: ‘Iceberg. User. proxy’ = ‘targetUser’

    1) Enable SuperUser

    2) Team account authentication

  • Enable agent user when accessing HDFS:

  • Specify the agent user when accessing Hive Metastore

    1) Refer to Spark’s relevant implementation:

    org.apache.spark.deploy.security.HiveDelegationTokenProvider

    2) Dynamic proxy Hive Metastore with proxy users accessing Hive Metastore

4. Flink SQL into the lake example

DDL + DML

5. CDC data into the lake link

As shown above, we have an AutoDTS platform that is responsible for real-time access to the business library data. We will access the data from these business libraries to Kafka, and it also supports the configuration of distribution tasks on the platform, which is equivalent to distributing the data from Kafka to different storage engines, in this case to Iceberg.

6. Flink SQL CDC access link

Here are the changes we made to support CDC access to the lake based on “Flink1.11 + Iceberg 0.11” :

  • Improved Iceberg Sink:

    AppendStreamTableInk (version 1.11) cannot handle CDC streams. Modified and adapted.

  • Table management

    1) Support Primary Key (PR1978)

    2) Open V2: ‘Iceberg Format.version’ = ‘2’

7. CDC data into the lake

1. Support the Bucket

In the Upsert scenario, you need to ensure that the same data is written to the same Bucket. How does this work?

Flink SQL syntax does not currently support declaring bucket partitions. You can declare buckets as configured:

‘partition.bucket.source’=’id’, // Specify the bucket field

Num =’10’, // Specify the number of buckets

2. Copy-on-write sink

The reason why we do copy-on-write is that the merge-on-read system in the original community does not support merging small files, so we temporarily implement copy-on-write sink. At present, the service has been tested and used with good results.

The top Copy on-write implementation is similar to the merry-on-read version, with multiple parallel writes to streamWriter and a single parallel submission to Filecommitter.

In copy-on-write, you need to set the number of buckets according to the amount of data in the table, without having to do additional small file merging.

  • StreamWriter multiparallel writes during the snapshotState phase

    1) Increase Buffer;

    The last checkpoint has committed successfully;

    3) Group and merge by bucket, write to each bucket.

  • FileCommitter is submitted in a single parallel order

    1) the table. NewOverwrite ()

    2) Flink.last.com mitted. Checkpoint. Id

8. Example – CDC data configuration into a lake

As shown in the figure above, in practice, the business side can create or configure the distribution task on the DTS platform.

For instance type, select the Iceberg table, and then select the target library, indicating which table’s data should be synchronized to the Iceberg table. Then you can select the mapping relationship between the original table and the target table’s fields. After configuration, the distribution task can be started. After startup, it will submit a real-time task in the real-time computing platform Flink, and then use copy-on-write sink to write the data to the Iceberg table in real time.

9. Other practices of entering the lake

Practice # 1: Reduce the Empty Commit

  • Problem description:

    If there is no data in upstream Kafka for a long time, each Checkpoint will generate a new Snapshot, resulting in a large number of empty files and unnecessary snapshots.

  • Solution (PR-2042) :

    “Commit” is a Commit that commits a “Commit” sequence that commits a “Commit” sequence that commits a “Commit” sequence that commits a “Commit” sequence.

Practice 2: Record the Watermark

  • Problem description:

    At present, Iceberg table itself cannot directly reflect the progress of data writing, and offline scheduling is difficult to accurately trigger downstream tasks.

  • Solution (PR-2109) :

    In the Commit stage, the Watermark of Flink is recorded in the Properties of Iceberg table, which can directly reflect the end-to-end delay, and can also be used to judge the integrity of partition data for scheduling and triggering downstream tasks.

Practice 3: Delete tables and optimize

  • Problem description:

    Removing Iceberg can be slow, causing the platform interface to timeout accordingly. Because Iceberg is an object-oriented store that abstracts the IO layer, there is no quick way to clean up the directory.

  • Solution:

    Expand FileIO, add DeleteDir method, fast delete table data on HDFS.

10. Small file merging and data cleaning

Perform batch tasks (Spark 3) for each table periodically, divided into the following three steps:

1. Merge small files with new partitions regularly:

rewriteDataFilesAction.execute(); Merge only small files and do not delete old files.

2. Delete expired Snapshot, clean metadata and data files:

​ table.expireSnapshots().expireOld erThan(timestamp).commit();

3. Clear orphan files. By default, clear unreachable files that are 3 days old:

​ removeOrphanFilesAction.older Than(timestamp).execute();

11. Computing Engine — Flink

Flink is the core computing engine of the real-time platform. At present, it mainly supports the scene of data entering the lake. It has the following characteristics.

  • Quasi-real-time data entry into the lake:

    Flink and Iceberg Iceberg have the highest data integration in the lake, and the Flink community actively embraces the data lake technology.

  • Platform integration:

    AutoStream introduces IcebergCatalog, which supports the configuration of MySQL, SQLServer, and TIDB tables into the lake by SQL.

  • Batch and flow:

    The advantages of Flink will be realized gradually under the concept of batch integration.

12. Computing Engine — Hive

Hive is more integrated in Iceberg and Spark 3 at the level of SQL batch processing, and mainly provides the following three functions.

  • Regular small file merge and Meta information query:

    SELECT * FROM prod.db.table. History SELECT * FROM prod.db.table. History

  • Offline data writing:

    1) Insert into 2) Insert overwrite 3) Merge into

  • Analyze the query:

    It mainly supports everyday quasi-real-time analysis query scenarios.

13. Computing Engine — Trino/Presto

Autobi has been integrated with Presto for reporting, analytical query scenarios.

  • Trino

    1) Iceberg is directly used as the report data source

    2) need to add metadata caching mechanism: https://github.com/trinodb/trino/issues/7551

  • Presto

    Community integration: https://github.com/prestodb/presto/pull/15836

14. A pothole

1. Access Hive Metastore exceptions

Problem Description: A misuse of the HiveConf constructor caused declared configurations in the Hive client to be overridden, resulting in an exception when accessing Hive Metastore

Solution (PR-2075) : Fix the construction of HiveConf to show that the AddResource method is called to ensure that the configuration is not overridden: HiveConf.addResource (conf);

2.Hive Metastore was not released

CommitFailedException: Timed out after 181138 MS Waiting for lock XXX. “The reason is that the HiveMetastoReclient. lock method also needs to display Unlock even if the lock is not obtained, otherwise it will cause the above exception.

Solution (PR-2263) : Optimize the HivetableOperations# AcquireLock method to show that the unlock is called to release the lock if the lock acquisition fails.

3. Metadata file missing

NotFoundException Failed to open input stream for file: xxx.metadata.json NotFoundException Failed to open input stream for file: xxx.metadata.json

Solution (PR-2328) : When Hive Metastore was called to update the metadata\_location of the Iceberg table, a checking mechanism was added to confirm that metadata was not saved successfully and then the metadata file was deleted.

III. Revenue and summary

1. Summary

Through the exploration of the integration of lake warehouse and flow batch, we have made a summary respectively.

  • One lake storehouse

    1) Iceberg supports Hive Metastore;

    2) The overall usage is similar to Hive tables: same data format, same computing engine.

  • Flow of fusion

    Realize the unification of streams and batches in quasi-real-time scenarios: same origin, same computation and same storage.

2. Business income

  • Improved data timeliness:

    The warehousing delay was reduced from more than 2 hours to less than 10 minutes; The SLA, the core task of the algorithm, was completed 2 hours ahead of schedule.

  • Quasi-real-time analysis queries:

    Combined with Spark 3 and Trino, support for quasi-real-time multidimensional analytical queries.

  • Feature engineering benefits:

    The quasi-real-time sample data is provided to improve the timeliness of model training.

  • CDC data quasi-real-time warehousing:

    Can be in the data warehouse for the business table to do real-time analysis query.

3. Architecture benefits – Quasi-real-time data storehouse

As mentioned above, we support quasi-real-time warehousing and analysis, which is equivalent to providing a basic framework verification for the subsequent construction of quasi-real-time data warehousing. The advantage of the quasi-real-time data storehouse is that it is developed at one time, with unified caliber and unified storage. It is a real batch stream integration. The disadvantage is poor real-time, where once you had second, millisecond latency, now you have minute data visibility.

However, in terms of architecture, this is of great significance. In the future, we can see some hope that we can transform the original “T + 1” database into a quasi-real-time database, improve the overall data timeliness of the database, and then better support the upstream and downstream businesses.

IV. Follow-up planning

1. Follow the Iceberg version

Fully open V2 format, support CDC data MOR into the lake.

2. Construction of quasi-real-time data warehouse

Based on Flink through Data Pipeline mode log warehouse all levels of table speed up.

3. Batch together

As the Upsert functionality continues to improve, we continue to explore the storage-level stream batch integration.

4. Multidimensional analysis

Quasi-real-time multidimensional analysis based on Presto/Spark3 output.

Copyright Notice:The content of this article is contributed by Aliyun real-name registered users, and the copyright belongs to the original author. Aliyun developer community does not own the copyright and does not bear the corresponding legal liability. For specific rules, please refer to User Service Agreement of Alibaba Cloud Developer Community and Guidance on Intellectual Property Protection of Alibaba Cloud Developer Community. If you find any suspected plagiarism in the community, fill in the infringement complaint form to report, once verified, the community will immediately delete the suspected infringing content.