Introduction: Sf Express introduces Hudi On Flink to accelerate the width of data warehouse based On the idea of Flink for real-time data warehouse construction, as well as the practice of real-time data warehouse platform construction.

This article is compiled by Miao Witing, a community volunteer. The content is from the application practice of Flink in SF Express shared by Long Yichen, r&d engineer of SF Tech Big data platform, in Flink Forward Asia 2020. The main content is as follows: Based On Flink’s idea of building real-time data warehouse, SF Express introduced Hudi On Flink to accelerate data warehouse width table and the practice of real-time data warehouse platform construction. It is divided into the following five parts:

Construction background Construction ideas landing practice application case future planning

I. Construction background

Sf Express is a leading integrated express logistics service provider in China. After years of development, SF Express uses big data technology to support high-quality logistics services. The following is the flow process of a batch of express goods. It can be seen that the whole process from the customer placing an order to the final customer receiving the goods is very long, and some processing logic involved is quite complicated. In order to deal with the challenge of complex business, SF Express has carried out the exploration of data warehouse.

Traditional data warehouse is mainly divided into offline and real-time two parts.

  • In the off-line part, data extraction, cleaning, calculation and report output are completed by regular scheduling with fixed calculation logic.
  • The real-time part is demand-driven, where the user needs something and starts developing it right away.

This data warehouse architecture works well in the case of small amount of data and low requirement on real-time performance. However, with the development of business, the expansion of data scale and the continuous growth of real-time demand, the disadvantages of traditional data warehouse have been amplified.

  • In terms of the development efficiency of business metrics

Real-time indicators adopt demand-driven, longitudinal chimney development mode, which requires users to write Flink tasks for development. This development mode has low efficiency and high threshold, and it is difficult to uniformly manage and reuse the output indicators.

  • From a technical architecture perspective

Offline and real-time architectures are not unified, and there are differences in development mode, operation and maintenance mode, and metadata. The traditional architecture relies on offline T+1 scheduling to export reports. These scheduling tasks usually run in the early hours of the morning, resulting in a surge in cluster pressure, which may lead to unstable report output. If significant report outputs are delayed, the corresponding downstream report outputs are also delayed. This offline architecture cannot meet the needs of refined, real-time operations.

  • From a platform management perspective

Traditional data warehouse real-time index development is relatively extensive, there is no Schema specification, no metadata management, and no connection between real-time and offline data.

In order to solve the problem of traditional number warehouse, SF Express started the exploration of real-time number warehouse. Real-time data warehouse and offline data warehouse actually solve the same business problems, the biggest difference lies in the timeliness.

  • Offline data warehouse has hour or day delay;
  • Real time slots are seconds or minutes of delay.

Other features, such as data sources, data storage, and development methods, are similar. Therefore, we hope that:

  • Users can smoothly migrate from traditional data warehouse to real-time data warehouse to maintain a good experience;
  • Simultaneously unify real-time and offline architectures to speed up data output and reduce the tearing feeling of development;
  • It is also our goal to strengthen platform governance, reduce the threshold for users to use, and improve development efficiency.

Ii. Construction ideas

After summary, we extract the following 3 real-time warehouse construction ideas. The first is through the unification of warehouse standards, metadata and development process, so that users can achieve the unification of batch flow on development experience. Then, we introduce Hudi accelerated data warehouse width table and build our real-time data warehouse based on Flink SQL. Finally, strengthen the platform governance, carry on the data warehouse platform construction, realize the unified data access, unified development, and unified metadata management.

1. Batch stream unified real-time data warehouse

The construction of real-time data warehouse with unified batch flow can be divided into the following three stages:

1.1 Uniform data warehouse code

First of all, there must be a unified standard for the construction of data warehouse. The unified data warehouse specification includes the following parts:

  • The design specification
  • Naming conventions
  • Model specification
  • The development of specification
  • Storage specification
  • The process specification

After the unification of the data warehouse specification, the division of the data warehouse hierarchy begins, and the real-time and offline unified planning of the data warehouse hierarchy is divided into ODS, DWD, DWS and ADS layers.

1.2 Unified Metadata

Based on the above unified warehouse specification and hierarchy model, real-time and offline metadata can be managed in a unified manner. Downstream data governance processes, such as data dictionary, data lineage, data quality, and permission management, can be unified. This unification can precipitate the real – time data warehouse construction results, so that the data warehouse can be implemented better.

1.3 Unified development process based on SQL

Developers know that developing Flink tasks using the DataStream API can be complex. In the case of a large amount of data, performance and stability problems may occur if users do not have standard API usage or insufficient development capability. If we can unify the real-time development process into SQL, we can achieve the goal of reducing user development cost, learning cost, and operation and maintenance cost.

As mentioned before, we have unified real-time and offline metadata, so we can abstract the heterogeneous data sources and data stores on the left side of the figure into a unified Table, and then use SQL for unified data warehouse development, that is, offline batch processing, real-time stream processing and OLAP query are unified SQL.

1.4 Comparison of real-time data warehouse schemes

After completing the unification of data warehouse specification, metadata and development process, we began to explore the concrete architecture scheme of data warehouse architecture. The current mainstream of the industry is Lambda architecture and Kappa architecture.

  • Lambda architecture

Lambda architecture is based on the original offline data warehouse, the part that requires high real-time performance is stripped out, and a real-time speed layer is added. The disadvantage of Lambda architecture is that it needs to maintain two sets of real-time and offline architectures and two sets of development logic, resulting in high maintenance costs and large resource consumption brought by the other two architectures.

  • Kappa architecture

To address the shortcomings of the Lambda architecture, Jay Kreps proposed the Kappa architecture, which removed the original offline part and used a pure streaming engine development. The biggest problem of Kappa architecture is that the throughput of stream data playback cannot reach the level of batch processing, resulting in a certain delay in playback.

  • Real – time data warehouse scheme comparison with actual demand

In real production practice, it is not necessary to strictly follow the canonical Lambda or Kappa architectures, but can be a mixture of both. For example, most indicators are developed by streaming engine, and a few important indicators are developed by batch process, and the process of data proofreading is added.

In the business scenario of SF Express, not all users need pure real-time tables, and many users’ reports still rely on wide tables produced by offline T+1 scheduling. If we can accelerate the output of wide tables, the timeliness of other reports can be correspondingly improved.

In addition, the wide table produced by offline T+1 scheduling needs to aggregate the full data of multiple data sources within 45 days. Regardless of Lambda architecture or Kappa architecture, full data aggregation is required. If the wide table can be directly updated, full recalculation can be avoided and resource consumption and delay can be greatly reduced.

2. Introduce Hudi accelerometers

As mentioned earlier, the complexity of maintaining the Lambda architecture is the need to maintain both real-time and offline systems architectures. For this shortcoming, we can overcome through batch stream unification.

On balance, we decided to retrofit the original Lambda architecture and build the warehouse width table by speeding up the offline part of it. In this case, you need a tool to update and delete Hive tables in real time, support ACID features, and support historical data playback. Based on this need, we investigated three open source components on the market: Delta Lake, Iceberg and Hudi, and finally selected Hudi to accelerate wide tables.

2.1 Hudi Key Features

Key features of Hudi include: traceability of historical data, support for deleting data based on primary key updates in large data sets; Support incremental data consumption; Supports HDFS small file compression. These features are exactly what we need.

2.2 Introduction of Hudi accelerometer

Hudi is introduced in two ways to speed up the count bin. Firstly, Hudi is introduced in ODS layer to realize real-time data access, and the full data extraction of T+1 in ODS layer is changed to real-time access of T+0 to realize Hive table acceleration from data source.

In addition, Flink consumption of Kafka access data, cleaning aggregation, through Hudi incremental update of Hive wide table of DWD layer, accelerate the wide table from offline to semi-real-time.

2.3 Construct a real-time warehouse width representation example

This paper introduces how to build a real – time counter – width table through an example.

Assume that the waybill width table is composed of waybill table, order table and user table, which respectively contain waybill number, waybill status, order number, order status, user ID, user name and other fields.

The waybill table data is first inserted into the wide table, with the waybill number as the primary key of the wide table, and the mapping between the waybill number and the order number is stored in the temporary table. When the order table data is updated, the user dimension table is first associated to obtain the user name, and then the corresponding waybill number is obtained from the temporary table. Finally, order table data increments are inserted into the wide table according to the waybill number to update the status of the wide table.

3. Final architecture

After the introduction of Hudi, the final architecture of our customized real-time data warehouse based on Lambda architecture is shown in the figure below. Real-time speed layer through CDC access data to Kafka, using Flink SQL processing Kafka data, and ODS layer Kafka data cleaning calculation through Hudi semi-real-time update DWD layer wide table, in order to accelerate the output of wide table. The offline layer uses Hive for storage and processing. Finally, ADS layer provides unified data storage and service.

In addition to establishing data warehouse standards and building data warehouse architecture, we also need to build data warehouse platform to constrain development norms and processes, improve development efficiency and improve user experience.

From a data developer’s perspective, we not only need to provide fast data access capabilities, but also need to focus on development efficiency and unified metadata governance. Therefore, the three main functions of data access, data development and metadata management can be platformized based on Table and SQL abstraction to provide unified, convenient and efficient experience for real-time data warehouse users.

Third, landing practice

1. Hudi On Flink

Sf Express is the first company to introduce Hudi On Flink into production practice. The internal version of SF Express is based On T3 Travel’s internal branch with many modifications and improvements, which greatly improves the performance and stability of Hudi On Flink.

1.1 Implementation Principles

Here is the principle of Hudi On Flink. Hudi was originally strongly bound to Spark, and its write operation is essentially a batch process. To isolate Spark and unify apis, Hudi On Flink uses the mechanism of saving batch data during Checkpoint. When Checkpoint triggers, the batch data is Upsert to Hive and is committed or rolled back based On the Upsert result.

The implementation flow of Hudi On Flink can be broken down into several steps:

  1. First, use Flink to consume Binlog type data in Kafka and convert it into Hudi Records.
  2. Hudi Record enters InstantTime Generator, the Operator does not do any processing to the data, only forwarding the data. Each Checkpoint generates globally unique and incremental Instant on the Hudi Timeline and delivers it.
  3. The data is then entered into the Partitioner, which performs secondary partitioning based on the partition path and primary key. After partitioning, the data goes to the File Indexer. The corresponding File to be updated in the HDFS is found based on the primary key. The corresponding File is bucket based on the File ID and delivered to the downstream WriteProcessOperator.
  4. WriteProcessOperator stores a batch of data during Checkpoint. When Checkpoint is triggered, the WriteProcessOperator Upsert this batch of data to the HDFS through the Hudi Client. The Upsert results are sent to the downstream CommitSink.
  5. CommitSink collects upSERt results of all upstream operators. If the number of successes is equal to the parallelism of the upstream operator, the CommitSink considers the commit a success and sets Instant to SUCCESS. Otherwise, the CommitSink considers the commit a failure and performs rollback.

1.2 optimization

Sf Express has made some optimizations to Hudi On Flink based On community code, with the main purpose of enhancing performance and improving stability.

  • Secondary partition

In the incremental write scenario, most data is written to the current partition, which may cause data skew. Therefore, we use partition path and primary key ID to achieve secondary partition to avoid excessive data in single partition during batch saving and solve the data skew problem.

  • Document indexing

The bottleneck in the Hudi write process is how to quickly find the file to record and update. For this purpose Hudi provides an indexing mechanism that maps the combination of a record’s key + partition path to a file ID. This mapping does not change once the record is written to the filegroup. Hudi supports HBase, Bloom Filter, and memory index. However, in production practice, HBase indexes depend on external components, memory indexes may have OOM problems, and Bloom Filter has a certain error rate. Our research found that there was a hidden column in the Parquet file written by Hudi. The primary key of all data in the file could be obtained by reading this column. Therefore, the file path that data needed to be written could be obtained through the file index and saved in the state of Flink operator. External dependencies and OOM issues are also avoided.

  • Index write separation

In the original Hudi Upsert process, the process of writing and indexing is in an operator, and the parallelism of the operator is only determined by the partition path. We separate the indexing and writing process, which can improve the parallelism of Upsert operator and improve the throughput of writing.

  • Fault recovery

Finally, the State of the whole process is saved in Flink State, and a set of state-based fault recovery mechanism is designed to ensure the end-to-end exact-once semantics.

2. Productization of real-time data warehouse

We have also done some work on the real time data warehouse productization. Data warehouse development suite including data access, metadata management, data processing is provided.

2.1 Real-time Data Access

Real-time data access adopts form-based process access mode, shielding complex underlying technology, and users only need to access external data sources into the data warehouse system through simple operations. Take MySQL as an example. The user only needs to select the MySQL data source, and the platform automatically extracts and displays the Schema. After the user confirms the Schema, the Schema is inserted into the platform metadata.

Then, the user selects the cluster that has permission, sets the primary key ID and partition field of the Hive table, and submits the application. After submitting the application, the platform automatically generates Flink tasks to extract data into Kafka and automatically drop the data into the Hive table. For data sources of database type, it also supports the function of sub-database and sub-table, which writes the business data of sub-database and sub-table into the same table of ODS layer. In addition, it also supports collecting master/slave synchronization database, querying the stock data from the slave database, and fetching Binlog from the master database to reduce the pressure on the master database and reduce the data synchronization delay.

2.2 Real-time metadata update

Real-time metadata update process, or MySQL as an example. CDC Source extracts binlogs from the database and processes them separately from DDL and DML statements. DDL statements are reported to the metadata center, and DML statements are converted into Binlog data in Avro format and sent to Kafka. If downstream data needs to be written to Hive, Kafka data consumed is written to Hive through Hudi Sink.

2.3 Data Asset management system

Based on the unified access of real-time data and the combination of it with the existing offline data warehouse, we build a data asset management system. It includes standardizing warehouse standards, unifying metadata management, improving data quality, ensuring data safety, and taking inventory of data assets.

3. Real-time computing platform architecture

With the foundation of unified data access and the protection of data asset management system, we also need a data development suite to integrate the whole process of data development into the real-time computing platform. At the bottom of the real-time computing platform is the data access layer, which supports data sources such as Kafka and Binlog. The upper layer is the data storage layer, which provides storage components such as Kafka, ES, HBase, Hive, ClickHouse, and MySQL. Supports JStorm, Spark Streaming and Flink computing engines. Framework packaging and common component packaging are carried out.

3.1 Various development modes – JAR & DRAG

Real-time computing platform provides a variety of development modes for different users to choose. Taking Flink as an example, Flink JAR mode allows users to write Flink task codes and upload them into JAR packages to the platform to meet the needs of advanced users. Flink DRAG mode is graphical drag-and-drop development. After the platform wraps the common components, users only need to DRAG and drop the common components to assemble them into a Flink task and submit it to the cluster for execution.

3.2 Multiple development modes – SQL

The real-time computing platform also provides SQL development mode, supports manual table building, automatic table identification and table attribute setting according to metadata. Supports UDF creation, automatic IDENTIFICATION of UDFs, and DML execution.

3.3 Task Control

In terms of task control, real-time computing platform simplifies task configuration as much as possible and shields some complex configurations. After the development is complete, you only need to select the cluster and fill in the resources to submit the task to the cluster. The platform also provides historical versioning capabilities for each task.

When a user manipulates a task, the platform automatically resolves the configuration of the task and provides different options for different components. For example, if Kafka data source is selected, when starting, you can choose to start from the last consumption location, the earliest location, the latest location, or the specified location.

In task recovery, you can start stopped Flink tasks from Savepoint for quick recovery.

3.4 Task O&M

Task operation and maintenance is a difficult and painful point for real-time tasks. The platform provides the log query function to collect historical startup logs and task running logs for easy comparison and query.

After a task is started, the platform automatically collects and reports task indicators. Users can customize alarm configurations based on these indicators. When an alarm rule is triggered, the platform sends alarms to users in various ways. Finally, the platform provides real-time monitoring kanban for metrics, which can also be configured by users in Grafana.

By collecting logs, indicators and monitoring alarms, as well as past historical experience, we have realized an intelligent machine customer service, which can realize some self-diagnosis of task faults. These initiatives significantly reduce mission operation and maintenance costs and reduce the pressure on platform developers.

3.5 Flink task stability guarantee

Real-time operation and maintenance is most concerned with stability, and we have some practices in ensuring the stability of Flink tasks. First, it provides various functions of anomaly detection and alarm monitoring to help users quickly discover problems. A task snapshot is periodically generated for each task to save the Savepoint of the task history to facilitate task rollback and fault recovery. A task may fail due to some abnormal reasons. After the task fails, the platform pulls the task to a new one and re-consumes the task from the position where the task failed last time.

The high availability mechanism based on Zookeeper ensures the availability of JobManager. Supports the Dr Switchover function for multiple clusters and equipment rooms, and allows tasks to be switched to a Dr Cluster with one click. A resource isolation system of real – time offline cluster isolation and queue management is realized.

4. Application cases

In wide-table computing, you need to obtain data from multiple data sources within 45 days to perform calculation aggregation. If the offline data warehouse is used, about 3000 cores of CPU and 12000G of memory are required. It takes 120 to 150 minutes to complete the calculation, and the amount of data processed is about 450T. If the real-time data warehouse is used, about 2500 cores of CPU and 1400G of memory are needed, and there is a delay of 2 to 5 minutes for updating the wide table. The amount of data processed is about 18T.

5. Future planning

Sf Express has made some achievements in the construction of real-time data warehouse, but it still needs continuous optimization in the future.

1. Enhance SQL capabilities

First, you want to be able to support more SQL syntax and features, support more connectors available, and enable automatic tuning of SQL tasks.

2. Fine-grained resource management

Secondly, based On Flink On Kubernets and automatic elastic scaling of tasks, task-level fine-grained resource scheduling achieves fine resource scheduling management, making Flink tasks fully flexible and cloud protogenics.

3. Stream batch integration

Finally, I hope to realize the integration of stream and batch, through unified SQL with high compatibility, through SQL parsing and engine adaptation, through Flink unified engine to process stream and batch.

The original link

This article is the original content of Aliyun and shall not be reproduced without permission.