Brief introduction: Aliyun technical expert Li Shaofeng (Fengze) prepared the speech on the Website of The Joint Meetup of Apache Hudi and Apache Pulsar in Hangzhou. This topic will introduce the typical CDC entering the lake and how to use Pulsar/Hudi to build the data lake. The Hudi core design, new vision and community updates will also be shared.

PPT download link of this article:

Shaofeng Li (Fengze) — Technical expert of Aliyun — “CDC Data into lake Based on Apache Hudi”. PDF

I. Background of CDC

First of all, what is CDC? CDC, which stands for Change Data Capture, is a common technique in the database world that captures changes to a database and then sends the Change data downstream. It has a wide range of applications, including data synchronization, data distribution and data collection, as well as ETL. Today, we mainly share DB data ETL to the data lake by means of CDC.

There are two types of CDC in the industry. One is query-based. The client queries the change data of the source database table in SQL and sends it to the external. The second method is log-based, which is widely used in the industry. Generally, it is through binlog. Records of changes will be written into binlog, which will be written into the message system after parsing binlog, or directly processed based on Flink CDC.

The two are different. Query-based is relatively simple and intrusive, while log-based is non-intrusive and has no impact on the data source, but binlog parsing is more complex.

Query-based and log-based implementations are based on timestamp, trigger and snapshot-based, and log-based implementations of CDC. Here is a comparison of several implementations.

This table shows that log-based synthesis is optimal, but parsing is complicated. However, there are many open source binlog parsers in the industry, including Debezium, Canal, and Maxwell, which are common and popular. ETL pipes can be built based on these binlog parsers.

Here is a look at one of the more popular CDC warehousing architectures in the industry.

The real-time stream parses the binlog through Canal, then writes the binlog to Kafka, and then synchronizes Kafka data to Hive every hour. Offline streams need to pull a full volume of data from the tables synchronized to the Hive source layer. If only the previous real-time streams have incomplete data, you must import the full volume of data to each ODS table using SQL Select of offline streams. It can be seen that the real-time performance of ODS layer is not enough, and there is a delay of hours and days. For the ODS layer, this delay can be achieved in minutes by introducing Apache Hudi.

2. CDC data entry into the lake

Based on the CDC data into the lake, the architecture is very simple. All kinds of upstream data sources, such as DB change data, event flow, and various external data sources, can be written into the table through change flow, and then external query analysis, the whole architecture is very simple.

The architecture is simple, but it still faces many challenges. Taking Apache Hudi data Lake as an example, the data lake stores various data through files. For CDC data processing, it is necessary to make reliable and transactional changes to some files in the lake, so as to ensure that downstream queries do not see partial results. In addition, it is necessary to update and delete CDC data efficiently. In addition, for every small batch of data writing, it is expected to automatically process small files to avoid complex small file processing. In addition, query-oriented layout optimization, such as Clustering, can be used to transform file layout to provide better query performance.

How does Apache Hudi address these challenges? First of all, it supports transactional writing, including MVCC mechanism between read and write to ensure that writing does not affect reading, and also can control transactions and concurrency. For concurrent writing, it adopts OCC optimistic lock mechanism, and for updating and deleting, some built-in indexes and custom guarantee that updating and deleting are more efficient. The other is query-oriented optimization. Hudi will automatically manage small files, and files will automatically grow to a user-specified file size, such as 128M, which is also a core feature for Hudi. Hudi also provides Clustering to optimize file layout.

Below is a typical CDC link into the lake. The link above is the link adopted by most companies. CDC data is first imported into Kafka or Pulsar through CDC tool, and then written into Hudi through Flink or Spark streaming consumption. The second architecture is directly linked to the MySQL upstream data source via the Flink CDC, writing directly to the downstream Hudi table.

In fact, both links have their advantages and disadvantages. The first link unified data bus has good scalability and fault tolerance. For the second link, scalability and fault tolerance are slightly lower, but maintenance costs are correspondingly lower due to fewer components.

This is the CDC link into the lake of Ali Cloud database OLAP team. Because we are the Spark team, we adopt Spark Streaming link into the lake. The entire link into the lake is also divided into two parts: A full synchronization task is performed, and Spark is used to pull full data. If there is a secondary database, the secondary database can be directly connected to perform full synchronization to avoid impact on the primary database. Then, data is written to Hudi. An incremental job is then started that synchronizes binlogs to Hudi tables in quasi-real time by consuming binlog data in Ali Cloud DTS using Spark. The scheduling of full and incremental jobs makes use of Lakehouse’s automatic scheduling ability to coordinate full and incremental jobs. For the connection of full and incremental jobs, Upsert semantics of Hudi are used to ensure the final consistency of full and incremental data, and there will be no problem of more or less data.

Our team also made some optimizations on the CDC link to the lake at Lakehouse.

The first is the Schema change processing of the original library, which is the scenario where some columns are added, deleted or modified by the clients we interconnect with. Before Spark writes Hudi, it checks the Schema to see if the Schema is valid. If the Schema is valid, it can be written normally; if the Schema is invalid, the write fails. Deleting fields will result in invalid Schema verification and job failure. Therefore, we will catch the exception of Schema Validation. If it is found that the field is reduced, we will auto-complete the previous field and retry it to ensure that the link is stable.

Second, some client tables do not have a primary key or the primary key is not reasonable, such as using the update time field as the primary key, or setting the partition field that changes, in this case, the data written to Hudi will not match the source database table data. Therefore, we made some product-level optimizations to allow users to properly set primary keys and partition mappings to ensure that synchronization into Hudi is fully aligned with the source repository.

Another common requirement is for users to add a table to the upstream library. If table level synchronization is used, the new table cannot be sensed in the whole link, so it cannot be synchronized to Hudi. In Lakehouse, we can synchronize the whole library, so when a new table is added to the library, it will be sensed automatically. Automatically synchronize the newly added table data to Hudi, so that the original library can increase the ability of automatic table awareness.

Another is to optimize the performance of CDC writes, such as pulling a batch of data containing Insert, Update, Delete, etc. Do you always use Hudi Upsert for writes? This is easier to control and Upsert has the ability to de-duplicate data, but the problem is that it is less efficient to find indexes than the Insert method, which does not need to find indexes and is more efficient. Therefore, for each batch of data, we will determine whether it is all Insert events. If it is all Insert events, we will directly Insert the data to avoid the cost of checking whether the file is updated. Data display can improve performance by 30% to 50%. Of course, DTS exceptions also need to be taken into account. When re-consuming data, we cannot directly use Insert during recovery, otherwise there may be data duplication. For this problem, we introduced table level Watermark to ensure that data duplication will not occur even in DTS exceptions.

Iii. Hudi Core design

Next, I will introduce Hudi’s positioning. According to the latest vision of the community, Hudi is defined as a streaming data lake platform, which supports massive data update, built-in table format and transaction storage, and a series of list services such as Clean, Archive,

Support operations by Compaction, Clustering, out-of-the-box data services, and built-in O&M tools and metrics monitoring.

This is the picture on Hudi’s official website. It can be seen that Hudi is a lake storage in the whole ecosystem. The bottom layer can be connected to HDFS and object storage of various cloud manufacturers, as long as it is compatible with Hadoop protocol. Upstream is the flow of change events into the lake, which can support a variety of data engines, such as Presto, Spark, and on-cloud products. In addition, the incremental pull capability of Hudi can be used to build derived tables with Spark, Hive, and Flink.

The entire Hudi architecture is very complete and positioned as an incremental processing stack. Streaming is typically row-oriented, processing data line by line, and processing is very efficient.

However, there is no way to do large-scale analysis and scan optimization in line-oriented data, while batch processing may require full processing once a day, which is relatively inefficient. Hudi introduces the concept of incremental processing to process data after a certain point in time, which is similar to stream processing but much more efficient than batch processing. Moreover, Hudi itself is oriented to column storage data in the data lake, and scanning optimization is very efficient.

And look at the history of Hudi. In 2015, the chairman of the community published an article on incremental processing, which was put into production at Uber in 2016, providing support for all database critical businesses; In 2017, Uber supported 100PB data lake. In 2018, with the popularization of cloud computing, it attracted users at home and abroad. Uber donated it to Apache for incubation in 19; It became the top project in about a year in 2020, and the adoption rate increased by more than 10 times. Uber’s latest update for 2021 shows Hudi supporting 500 petabytes of data lakes, along with a number of Hudi enhancements, such as Spark SQL DML and Flink integration. Recently, the number of hud-based data lake practices shared by bytedance’s recommendation department has exceeded 400PB, and the total storage has exceeded 1EB, increasing PB levels.

After several years of development, many domestic and foreign companies adopt Hudi, such as Huawei Cloud, Ali Cloud, Tencent Cloud and AWS of public cloud, all integrate Hudi, ali Cloud also builds Lakehouse based on Hudi. The migration of bytedance’s entire data warehouse system to the lake is also based on Hudi, and there will be a corresponding article sharing their practice of increasing PB data volume based on Flink+Hudi data lake. At the same time like Baidu, Kuaishou head Internet factory are in use. At the same time, we know that there are also icbc, Agricultural Bank of China, Baidu Finance and Baixin Bank in the banking and financial industry. The game field includes sanqi Interactive Entertainment, Miha Tour and 4399. Hudi has been widely used in all walks of life.

Hudi positioning is a complete set of data lake platform, the top layer for users to write a variety of SQL, Hudi as a platform to provide a variety of capabilities, the following layer is BASED on SQL and programming API, and the next layer is Hudi kernel, including indexing, concurrency control, table services. The community will build a Cache based on the Lake Cache. The file format is open Parquet, ORC, HFile storage format. The entire data Lake can be built on a variety of clouds.

The key design of Hudi will be introduced later, which will be very helpful to our understanding of Hudi. The first is the file format, which is based on Fileslice at the bottom of the design, which translates to file slices, file slices contain basic files and incremental log files. The base file is a Parquet or ORC file, and the incremental file is a log file. Writes to the log file are encoded with blocks in Hudi, and a batch of updates can be encoded as a data block and written to the file. While the base file is pluggable and can be based on Parquet, the latest version 9.0 already supports ORC. Also based on HFile, HFile can be used as metadata tables.

The Log file holds a series of various data blocks, which are somewhat similar to the redo Log of a database. Each version of the data can be found through the redo Log. For the base file and Log file through compression do merge to form a new base file. Hudi supports both synchronous and asynchronous Compaction. This gives users the flexibility to create a new Compaction that doesn’t respond to latency while running a new operation asynchronously, or when a user wishes to write a new Compaction that doesn’t adhere to latency. You can do a Compaction asynchronously without affecting the primary link.

Hudi is based on the concept of a File Group in a File Slice. A File Group contains different File slices and File slices constitute different versions. Hudi provides a mechanism to retain the number of metadata and ensure that the size of metadata is controllable.

For data update and writing, try to use Append. For example, when a Log file is written before, it will continue to try to write to the Log file. HDFS is very friendly to stores that support append semantics, but most cloud object stores do not support Append semantics, that is, data cannot be changed after being written. Only new Log files can be written. For each FileGroup, that is, different filegroups are isolated from each other. Different logic can be implemented for different file groups. Users can customize algorithms to achieve this flexibility.

A design based on Hudi FileGroup can bring a lot of benefits. If a new Compaction algorithm updates a 100 Megabyte file, it costs 600 MEgabytes to merge a 100 Megabyte file. If a new Compaction algorithm updates a 100 Megabyte file, it costs 600 MEgabytes to merge a 150 Megabyte file. There are still 4 files of 100M, which are also updated. Each time, for example, 25M is merged with 400M, and the cost is 1200M. As can be seen from the design of FileGroup, the cost of merging is reduced by half.

And tabular form. The tabular format shows how documents are stored in Hudi. First, define the root path of the table, then write some partitions, and Hive file partition organization is the same. There are also definitions of Schema for tables, Schema changes for tables. One way is to record metadata in files, and another way is to store metadata with external KV. Both have their advantages and disadvantages.

Hudi represents schemas based on Avro format, so its Schema Evolution capability is exactly the same as Avro Schema’s Evolution capability, that is, it can add fields and make upward compatible changes, such as int becoming long is compatible, but long becoming int is incompatible.

At present, there is already a scheme in the community to support Full Schema Evolution, that is, you can add a field, delete a field, rename, that is, change a field.

Another is the index design for Hudi. When each piece of data is written to Hudi, a mapping of the data primary key to a file group ID is maintained, so that the changed files can be located more quickly when updated or deleted.

The diagram on the right shows an order table that can be written to different partitions by date. Here is the user table, which does not need to be partitioned because it does not have as much data and changes as frequently. You can use a non-partitioned table.

For partitioned tables and frequently changed tables, when Flink is used to write, the global index constructed by Flink State is more efficient. All indexes are pluggable, including Bloomfilter and HBase high-performance indexes. In byte scenarios, Bloomfilter cannot meet the increasing number of PB index searches. Therefore, HBase high-performance indexes are used. Therefore, users can flexibly choose different indexes based on their own service types. Late update, random update scenarios can be supported at low cost with different types of indexes.

Another design is concurrency control. Concurrency control was introduced after 0.8. Hudi provides an optimistic locking mechanism to handle concurrent writes, checking for conflicting changes at commit time and failing writes if they do. When table services such as Compaction or Clustering do not have locks internally, Hudi has an internal coordination mechanism to avoid lock contention issues. When Compaction happens, you can create a new page on the timeline, and then uncouple it from the write link to create a new page asynchronously.

Such as data ingest link on the left, data once every half an hour to absorb, on the right is the asynchronous delete operation, will also change the table, and very likely and write modified conflict, will cause the failure of this link, platform without the consumption of CPU resources, community for this kind of situation has improved plan now, hope to testing concurrent writes conflict, early termination, Reduce waste of resources.

Another design is metadata tables. Because Hudi was originally built and designed based on HDFS, there was not much consideration for cloud storage scenarios, resulting in very slow FileList on the cloud. Therefore, in version 0.8, the community introduced Metadata Table. Metadata Table itself is also a Hudi Table, which is constructed as a Hudi and can reuse various Table services such as Hudi Table. Metadata Table files will store all file names and file sizes under the partition, perform query optimization for statistics of each column, and build global indexes based on Meta Table. Each record corresponds to each file ID and is recorded in Meta Table. Reducing the overhead of querying files to be updated while processing Upsert is also a must for the cloud.

4. Future planning for Hudi

Future plans, such as building Lakehouse based on Pulsar and Hudi, are proposed by StreamNative CEO to build Pulsar layered storage based on Hudi. In the Hudi community, we have also done some work to build Pulsar Source into the built-in Hudi toolkit DeltaStreamar. Now we have PR, and we hope that the two communities can be more connected. Pular layered storage kernel part of StreamNative has students doing.

Significant optimizations and improvements to 0.9.0 have been released in recent days. The first is the integration of Spark SQL, which greatly reduces the threshold for data analysts to use Hudi.

Flink’s solution for integrating Hudi was available as early as version 0.7.0 of Hudi, and after several iterations, Flink integrated Hudi has become very mature and has been used in production by big companies such as ByteDance. A CDC Format integration made by Blink team directly stores Update and Deltete events into Hudi. There is also a one-time migration of the stock data, which increases the batch import capacity and reduces the overhead of serialization and deserialization.

Hudi stores metadata fields, such as _HOODIE_COMMIT_time, which are extracted from data and have some storage overhead. Virtual keys are now supported. Metadata fields do not store data anymore. Cannot obtain Hudi change data after a certain point in time.

In addition, many partners also hope that Hudi supports ORC format, the latest version of Hudi supports ORC format, and this part of the format is pluggable, can be very flexible access to more formats. In addition, Metadata Table writing and query optimization are implemented. When using Spark SQL query, avoid Filelist and use Metadata Table to obtain Filelist information.

Looking further ahead, future plans for the community include upgrading Spark integration to Data Source ev2, where Hudi is currently based on V1 and cannot use V2 performance optimizations. Tables can be created, deleted, and updated through the Catalog integration. Table metadata can be managed through the Spark Catalog integration.

Professional students are responsible for Flink module Blink team, and Watremark in streaming data will be added to Hudi table later.

The other is integration with Kafka Connect Sink, which then writes Kafka data to Hudi directly through Java customers.

Kernel side optimizations include global record level indexing based on Metadata tables. In addition, byteDance partners support buckets. The advantage of this is that when data is updated, the corresponding Bucket can be found through the primary key. As long as the Bloomfilter of the parquet file corresponding to the Bucket is read, it can reduce the overhead of searching for updates.

There is also a smarter Clustering strategy, which we have worked on internally. The smarter Clustering strategy can dynamically enable Clustering optimization based on previous loads, as well as building secondary indexes based on Metadata tables. And Full Schema Evolution and cross-table transactions.

At present, Hudi community is developing rapidly, and the amount of code reconstruction is very large, but all for better community development. Flink integrated Hudi module from 0.7.0 to 0.9.0 has been basically completely reconstructed. If you are interested in the community, you can join us to build a better data lake platform.

The original link

This article is the original content of Aliyun and shall not be reproduced without permission.