Data warehouses have four basic characteristics: subject-oriented, integrated, relatively stable, and reflecting historical change. Among them, data integration is the primary premise of data warehouse construction, which refers to the integration of multiple scattered and heterogeneous data sources together for subsequent data analysis. Platformizing the data integration process will greatly improve the efficiency of data developers. The main contents of this paper are as follows:

  1. Data integration VS data synchronization
  2. Integration requirements
  3. Data integration V1
  4. Data Integration V2
  5. Effect of online
  6. conclusion

Flink Chinese learning website

flink-learning.org.cn

A data warehouse is a subject-oriented, integrated, nonvolatile, and time-variant collection of data in support of management’s decisions.

– Bill Inmon

Data integration VS data synchronization

The concept of “data integration” is often confused with that of “data synchronization”, so we distinguish between the two.

  • “Data integration” refers specifically to the data synchronization process for the ODS layer of the data warehouse,

  • “Data synchronization” is oriented to the generalized source-to-sink data transmission process.

The relationship between them is shown in the figure below:

The “data synchronization platform” provides basic capabilities, unadulterated by specific business logic. Data integration platform is built on “data synchronization platform”, in addition to the original data synchronization also contains some aggregation logic (such as through a database of log data to restore the snapshot data will be in detail below) and on the number of warehouse related content (all warehouse ODS layer library table naming conventions), etc. At present, we are planning the construction of “data synchronization platform”, but this does not affect the construction of “data integration platform”. Some synchronization requirements can be created in the “real-time computing platform” in advance, which can be decoupled in the way of “convention”.

It is worth noting that “data integration” should also cover “data collection” (supported by specific tools) and “data cleaning” (determined by collection granularity, logging specifications, and so on), both of which are implemented by individual companies and will not be covered in detail in this article.

Second, integration requirements

At present, the internal data integration requirements of Banyu are mainly reflected in three parts: Stat Log (service standardization Log or statistical Log), TiDB and MongoDB. In addition, there are some Service logs, Nginx logs, etc., which are not representative will not be introduced in this article. In addition, as the real-time data warehouse is under construction, the data Integration Platform only covers offline data warehouse (Hive) at present.

  • Stat Log: Service disk logs are collected by the FileBeat component to Kafka. Stat Log integration is relatively simple because logs are of the Append Only type. You Only need to synchronize Kafka data to Hive.
  • DB (TiDB and MongoDB) : THE DB data is relatively troublesome. The core requirement is that the data warehouse can have the mirror of the service database, that is, the snapshot of the data at a certain moment (day level or hour level) in the service database. Of course, sometimes there is also the need to analyze the process of data change. So DB data integration needs to take both aspects into account.

As the above two types of data integration methods are quite different, they will be discussed separately below.

Data integration V1

In the early stage, the “data integration platform” has already had its embryonic form, which is mainly realized by a series of open source tools. As time goes on, the problems exposed in this version gradually increase. Next, V1 will be elaborated mainly from the perspective of data flow, and more details will be reflected in the design of V2 version.

3.1 the Stat Log

The integration of logs is not connected to the platform, but in a smokestack development mode. The link of data integration is shown in the figure below:

Data in Kafka is synchronized to HDFS through Flume. The Spark task imports data from HDFS to Hive and creates partitions. The overall link is long and the introduction of third-party components (Flume) increases the operation and maintenance cost. In addition, the redundant storage of Kafka’s original data in HDFS also increases the storage cost.

3.2 DB

The integration of DB data is mainly realized in the query mode (batch mode, where the snapshot data is obtained by scanning the whole table through Select query), and its link is shown as follows:

The user submits the integration task via the platform. The scheduled market access task scans the integration platform metadata database and generates the corresponding fetch task (TiDB data via Sqoop tool and MongoDB data via Mongoexport tool). It can be seen that version V1 does not obtain the log data of database changes, which cannot meet the requirements for analyzing the data change process.

As THE Sqoop task ultimately obtains data from the business database in the TiDB production environment, a large amount of data will inevitably affect the business database to some extent. Mongoexport task directly acts on hidden nodes in MongoDB (no business data requests) and has negligible impact on online business. As a result, the DBA sets up a separate TiDB big data cluster to synchronize large business databases (based on TiDB Pump and Drainer components), so some Sqoop tasks can pull data from the cluster to eliminate the impact on the business database. From the perspective of data flow, the whole process is shown in the figure below:

Whether to synchronize production TiDB business database to TiDB big data cluster depends on the demand of warehouse and DBA’s evaluation of data volume. It can be seen that this mode also has a large amount of data redundancy, and the cluster resources reach the bottleneck with the increase of synchronization tasks. And with the subsequent evolution, TiDB big data cluster also covers part of the business database in the production environment of data application, and the cluster scope is gradually blurred.

Iv. Data integration V2

In V2 version, Flink was introduced to simplify the synchronous link, and DB data integration was changed from query-based to log-based (streaming), which greatly reduced the redundant storage.

4.1 the Stat Log

With Hive Integration support since Flink 1.11, we can easily write Kafka data to Hive, so Stat Log Integration is very easy (compared to V1, without Flume component dependency, Data redundancy is also eliminated), while the semantics of Flink exact-once ensure data accuracy. From the perspective of data flow, the whole process is shown in the figure below:

Currently log partitions are generated on an hourly granularity, and several Flink task configuration parameters are as follows:

Checkpoint: 10 min watermark: 1 min partition. Time-extractor. Kind: ‘custom’ sink.partition-commit. Kind: ‘metastore,success-file’ sink.partition-commit.trigger: ‘partition-time’

4.2 DB

The log-based integration of DB data means collecting DB log data. In our current implementation, TiDB is based on Pump and Drainer components (currently, the production database cluster version does not support ENABLING TICDC). MongoDB is based on MongoShake component, and the collected data will be transmitted to Kafka. In this way, the query pressure on the business database is reduced, the change process of the data can be captured, and redundant data storage is eliminated. However, as the original data is log data, the snapshot data needs to be restored by some means. The new link is shown below:

After a user submits an integration task, three tasks are created synchronously:

  • Incremental Task (Flow) : Incremental task synchronizes DB log data from Kafka to Hive. Due to sampling components are carried out in accordance with the cluster size, and the number of cluster is limited, the manual way is synchronization task in real-time computing platform to create, integrated task to create the default assumption that synchronization task has been ready, after being “data synchronization platform” landing can make more automation and validation synchronization.
  • Stock task (batch) : To restore snapshot data, at least one initial snapshot data is required. Therefore, the stock task is used to obtain the initial snapshot data of the integrated data from the service database.
  • Merge task (batch) : Merge task merges the existing data and incremental data to restore snapshot data. The restored snapshot data can be used as the storage of the next day. Therefore, you only need to schedule the storage task once to obtain the initial snapshot data.

The Stock tasks and Merge tasks are scheduled by the offline scheduling platform Dolphinscheduler (DS), and information is retrieved from the integrated task meta-database during task execution. Currently, Merge tasks are scheduled on an hourly basis, that is, snapshot data is restored every hour.

From the perspective of data flow, the whole process is shown in the figure below:

The data integration of DB is more complex than that of Stat Log. Next, the data integration of TiDB is used as an example to describe some key points in the design process (MongoDB process is similar, but the difference lies in the stock synchronization tool and data parsing).

4.2.1 Demand expression

For users, integration tasks need to provide the following two types of information:

  • TiDB source information: includes clusters, libraries, and tables
  • Integration mode: The integration mode indicates the aggregation granularity of snapshot data, including full and incremental. Full amount needs to be a snapshot of the stock data with today’s incremental log data aggregation, and incremental representation need only incremental today will log data aggregation (even incremental way without and stock a snapshot of the data aggregation, but the initial inventory is still necessary for, the use of specific form by the number of warehouse personnel to decide).

4.2.2 Inventory Tasks

Although the inventory task is executed only once, in order to completely eliminate the impact of data integration on the business database, we choose the backup and recovery mechanism of the database to achieve. Internal database backup and recovery operations have been platformized, and the cluster will be backed up periodically (daily granularity). The platform allows you to query the latest backup of the cluster, and the interface can trigger backup and recovery operations. Therefore, the acquisition of storage resources can directly affect the restored database.

Since the time point of database backup and the time point of integration task submission are not necessarily the same day, there is a certain time difference between them, which will lead to the stock snapshot data does not meet our expectations. The relationship between each time point is shown in the following figure:

According to our setting, the stock snapshot data should contain all the data before T4, while the actual backup snapshot data only contains all the data before T1, and there is a data difference of N days.

** Note: ** Here, the difference set is not used as the data between T1 and T4, because the incremental Binlog data is partitioned at the hour point. In Merge, partitioned data at the hour point is also aggregated with the existing data, and data deduplication is supported. Therefore, the Merge result of the stock data at time T1 and the incremental data between t0-T3 is equivalent to the Merge result of the stock data at time T0 and the incremental data between t0-T3. Therefore, the data difference set from T1 to T4 is equivalent to the data difference set from T0 to T3, which is the n-day data in the figure.

In fact, the missing part of data can be completed in the “stock task”, which can be realized through the complement operation of the “Merge task” after careful analysis.

The workflow of the entire “inventory task” is shown below:

  • Synchronization triggers the database platform to perform backup and restore, and generates the receipt ID.
  • After the recovery fails, the DBA needs to locate an exception. Therefore, the whole workflow is offline. After the recovery succeeds, the “stock task” can be restored on the platform again. In the process of recovery, the workflow directly exits and waits for the next wake up with DS scheduling. The recovery succeeds, and subsequent logic is displayed.
  • If so, perform the complement operation of the Merge task. The whole operation can be idemidemized. If the operation fails, exit the workflow and wait for the next scheduling.
  • Success, entire workflow offline, task complete.

Holdings Merge task

The prerequisite for the Merge task is that both the stock and delta data are ready, which we mark with the _SUCCESS file. The workflow of the Merge task is shown below:

  • Check whether the file marker exists. If not, the data is not ready, alarm and exit the workflow to wait for the next scheduling.
  • After the Merge operation is executed, a failure alarm is reported and the workflow exits until the next scheduling.
  • Successful, exit the workflow and wait for the next schedule.

The Merge operation is implemented through the Flink DataSet API. The core logic is as follows:

  • Load storage and incremental data and unify data format (core field: primary Key as aggregation field of the same data; CommitTs identifies the submission time of the binlog. By default, the storage data is 0 before the incremental data. OpType Indicates the data operation type, including Insert, Update, and Delete. The default value of the existing data is Insert.
  • Aggregate by primary key.
  • Retain the largest CommitTs data entry after aggregation, and discard the rest.
  • Filter data entries whose OpType is Delete.
  • Output the aggregation results.

Core code:

allMergedData.groupBy(x -> x.getKeyCols()) .reduce(new ReduceFunction<MergeTransform>() { public MergeTransform reduce(MergeTransform value1, MergeTransform value2) throws Exception { if (value1.getCommitTS() > value2.getCommitTS()){ return value1; } return value2; }}). Filter (new FilterFunction<MergeTransform>() { Op =delete public Boolean filter(MergeTransform Merge) throws Exception {if (merge.getOpType().equals(OPType.DELETE)){ return false; } return true; } }) .map(x -> x.getHiveColsText()) .writeAsText(outPath);Copy the code

For Insert and Update operations, the latest value overwrites the old value. For Delete operations, the latest value is discarded. This approach also naturally implements data deduplication.

4.2.4 Fault tolerance and data consistency guarantee

We can generally verify the fault tolerance of the scheme from the treatment of three task failure scenarios.

  • The Stock Task fails abnormally: The DS task sends a failure alarm because the backup and restoration fails. The Database platform does not support recovery and retry, requiring manual intervention. At the same time, the Merge task does not detect the _SUCCESS flag of the stock and the workflow does not advance backward.
  • “Incremental task” abnormal failure: Flink’s own fault tolerance mechanism and the external detection mechanism of “Real-time Computing Platform” guarantee the fault tolerance of “incremental task”. If during the term of “Merge” task scheduling “incremental task” has yet to be restored, will think the hours without incremental data skip execution, at this time of the snapshot update delay (Merge is the all-day stock of incremental data and aggregation, in after scheduling time points if the “incremental task” and can get the latest snapshot of the aggregation). Alternatively, you can manually trigger the Merge task supplement after the Incremental task recovers.
  • Merge Task abnormal failure: The task is idempotent. You can set the retry mechanism for DS task failures to ensure fault tolerance and send failure alarms.

Above, through the automatic recovery mechanism and alarm mechanism to ensure the correct execution of the whole workflow. Next, we can look at the consistency guarantee of the scheme from a data point of view.

Data consistency is reflected in the Merge operation. The aggregation of two pieces of data can ensure the correctness of the algorithm from the code level (which is verifiable and testable). Then, the only possible data inconsistency occurs in the two pieces of input data, namely, the stock and the increment. There are two cases:

  • Overlap between stock and increment data: this is reflected in the aggregation scenario of initial stock and incremental data at an hour. Due to the natural de-duplication of the algorithm, data consistency can be guaranteed.
  • Storage and incremental data is missing: Incremental data is missing, and incremental data is written to Hive by Flink. This process may cause data inconsistency, that is, data is out of order after partition submission. Although the partition will be committed again at the next checkpoint after the arrival of out-of-order data, the execution of downstream tasks will be triggered when the first partition submission is detected, causing data inconsistency in downstream tasks.

There are two ways to deal with out-of-order data in Flink streaming write Hive. First, Kafka sets single partition. Multiple partitions are the root cause of out-of-order data. The second is alarm compensation. Once the task of watermark is generated, it cannot be completely avoided (the tolerance time of watermark can be set through the watermark, but there is a limit), so it can only be compensated by alarm afterwards. The problem becomes how to sense out of order. We can further analyze that since the out of order data will trigger the second submission of the previous partition, we only need to detect the existence of _SUCCESS flag in the previous partition when submitting the partition to know whether it is out of order data and trigger the alarm.

Five, online effect

The overview

The stock of the task

The Merge task

Six, summarized

This paper describes the core design ideas of “Data integration platform” of Banyu. Some details of the whole scheme are not reflected in the paper, such as the change of data Schema, the analysis of DB log data, etc. These details are also crucial to the construction of the platform. At present, most integration tasks of Banyu have been switched to the new way and run stably. We are also advancing access to real-time warehouse integration tasks to provide a more unified experience.

Design and implementation of Banyu data integration platform


The recent hot

  • Flink Forward Asia 2021 postponed, online meet

  • Double the bonus! Please check the latest Flink Forward Asia Hackathon entry guide