The author of this paper is Xu Runbai, the big data developer of 37 Mobile Games. He introduced why 37 mobile games chose Flink as the computing engine and how to build a new integrated scheme of lake and warehouse based on Flink CDC + Hudi. The main contents include:

  1. Flink CDC basic knowledge introduction
  2. Hudi basic knowledge introduction
  3. Business pain points and technical scheme selection of mobile games
  4. 37 hand tour lake warehouse one introduction
  5. Flink CDC + Hudi practice
  6. conclusion

A, Flink – 2.0 the CDC

Flink CDC Connectors is a source connector of Apache Flink. Currently version 2.0 supports data acquisition from MySQL and Postgres, while version 2.1 is confirmed to support Oracle. MongoDB data source.

The core feature of Fink CDC 2.0 mainly realizes the following three very important functions:

  • There is no lock in the whole process, there is no risk to the database that needs to be locked;

  • Multi-degree of parallelism, the reading stage of full data supports horizontal expansion, so that large tables of 100 million levels can be accelerated by increasing the parallelism;

  • In the full phase, checkpoint is supported. Even if a task exits due to some reason, data can be resumed using the saved checkpoint.

Flink CDC 2.0 details core improvements

Second, Hudi

Apache Hudi is currently described in the industry as a Streaming Data Lake Platform built around a database kernel.

Since Hudi has good Upsert capability and 0.10 Master supports Flink version to 1.13.x, we choose Flink + Hudi to provide minute-to-minute Upsert data analysis and query capability for business scenarios of 37 mobile games.

Iii. Business pain points and technical scheme selection of 37 mobile games

1. Old architecture and business pain points

1.1 Insufficient real-time data

  • Sqoop synchronizes log data to Hive every 30 minutes for the first 60 minutes.

  • Sqoop synchronizes database data to Hive every 60 minutes.

  • Database data is synchronized to Hive using SQOOP every day for the last 60 days.

1.2 Service code logic is complex and difficult to maintain

  • At present, many business development of 37 mobile games follow the development mode of MySQL + PHP, with complex code logic and difficult to maintain;

  • The same code logic, often need to develop a code for stream processing, batch processing needs to develop another code, cannot be reused.

1.3 Frequently rebrush historical Data

  • Repeat historical data frequently to ensure consistency.

1.4 Schema Changes frequently

  • Table fields are often added due to business requirements.

1.5 The Hive version is early

  • The Current Hive version is 1.x, and upgrading the version is difficult.

  • Upsert is not supported.

  • Row-level DELETE is not supported.

Due to the business scenario of 37 mobile games, data upsert and DELETE are very common requirements. Therefore, the Hive data store-based architecture does not meet service requirements.

2. Technical selection

Canal and Maxwell were considered in the selection of synchronization tools. Canal, however, is only suitable for incremental data synchronization and requires deployment, which is relatively heavy to maintain. Maxwell, though lightweight, needs to be used with message queues such as Kafka, just like Canal. In contrast, the Flink CDC can be configured with the Flink Connector to be based on Flink-SQL, which is very lightweight and fits perfectly with the flink-SQL-based streaming batch architecture.

In terms of the selection of storage engine, the most popular data lake products are Apache Hudi, Apache Iceberg and DeltaLake, which have their own advantages and disadvantages in our scenario. Finally, due to Hudi’s openness to upstream and downstream ecology, support for global indexing, support for Flink version 1.13, and compatibility with Hive versions (Iceberg doesn’t support hive1.x), Hudi was selected as the storage engine of lake warehouse integration and stream batch integration.

In view of the above-mentioned business pain points and selection comparison, our final plan is as follows: Flink1.13.2 as the computing engine, relying on the streaming batch unified API provided by Flink, based on Flink-SQL to achieve the integration of streaming batch, Flink-cdc 2.0 as a data synchronization tool for ODS layer and Hudi-0.10 Master as a lake warehouse for storage engine solve the business pain point of maintaining two sets of code.

Iv. The new structure integrates with the lake warehouse

The integration of lake and warehouse of 37 mobile games is a part of the integration of flow and batch architecture of 37 mobile games. Through the integration of lake warehouse and flow batch, it can be achieved in the quasi-real-time scenario: data homology, same computing engine, same storage, and same calculation caliber. The timeliness of data can be up to the minute level, which can well meet the needs of business quasi real-time data warehouse. Here is the architecture diagram:

MySQL data comes into Kafka via Flink CDC. The reason why data is first imported into Kafka rather than directly imported into Hudi is to enable multiple real-time tasks to reuse data from MySQL and avoid multiple tasks to connect MySQL tables and binlogs through Flink CDC, which may affect the performance of MySQL library.

In addition to dropping a copy of the data into Kafka via CDC to the ODS layer of the offline data warehouse, the data will be sent from the ODS->DWD->DWS->OLAP database according to the link of the real-time data warehouse, and finally used by data services such as reports. The results of each layer of real-time data warehouse will be quasi-real-time dropped to offline data warehouse, through this way to achieve a program development, unified indicator caliber, unified data.

From the architecture diagram, it can be seen that there is a data correction (rerunning historical data) action. The reason for this step is that: there may be a situation of rerunning historical data due to calibre adjustment or the calculation result of the real-time mission of the previous day is wrong.

Data stored in Kafka has an expiration date. Historical data cannot be stored for a long time. Historical data that has been rerun for a long time cannot be retrieved from Kafka. Furthermore, if a large amount of historical data is pushed to Kafka again, the real-time computing link to correct the historical data may affect the real-time operation of the day. Therefore, the historical data of rerunning will be processed through data correction.

In general, the data warehouse of 37 mobile games belongs to the hybrid architecture of Lambda and Kappa. The flow and batch integrated data warehouse has the process of data quality check for each data link. On the second day, check the data of the previous day. If there is no abnormality in the data calculated in real time on the previous day, there is no need to revise the data, and Kappa architecture is sufficient.

Flink CDC 2.0 + Kafka + Hudi 0.10 practice

1. Environment preparation

  • Flink 1.13.2

  • . /lib/hudi-flink- bundle_2.11-0.10.0-snapshot. jar /lib/hudi-flink- bundle_2.11-0.10.0-snapshot. jar

  • . /lib/ hadoop-mapreduce-client-core-27.3.jar

  • . SQL – connector – mysql/lib/flink – – the CDC – 2.0.0. Jar

  • . / lib/flink – format – changelog – json – 2.0.0. Jar

  • . / lib/flink – SQL – connector – kafka_2. 11-1.13.2. Jar

MySQL > alter table mysql-cdc;

create table sy_payment_cdc (
  ID BIGINT.PRIMARY KEY(ID) NOT ENFORCED
) with(
  'connector' = 'mysql-cdc'.'hostname' = ' '.'port' = ' '.'username' = ' '.'password' = ' '.'database-name' = ' '.'table-name' = ' '.'connect.timeout' = '60s'.'scan.incremental.snapshot.chunk.size' = '100000'.'server-id'='5401-5416'
);
Copy the code

It is worth noting: scan. Incremental. The snapshot. The chunk. The size parameter need to configure according to actual condition, if the table data volume is not large, use the default value.

Kafka+Hudi COW;

create table sy_payment_cdc2kafka (
  ID BIGINT.PRIMARY KEY(ID) NOT ENFORCED
) with (
  'connector' = 'kafka'.'topic' = ' '.'scan.startup.mode' = 'latest-offset'.'properties.bootstrap.servers' = ' '.'properties.group.id' = ' '.'key.format' = ' '.'key.fields' = ' '.'format' = 'changelog-json'
);

create table sy_payment2Hudi (
  ID BIGINT.PRIMARY KEY(ID) NOT ENFORCED
)
PARTITIONED BY (YMD)
WITH (
  'connector' = 'Hudi'.'path' = 'hdfs:///data/Hudi/m37_mpay_tj/sy_payment'.'table.type' = 'COPY_ON_WRITE'.'partition.default_name' = 'YMD'.'write.insert.drop.duplicates' = 'true'.'write.bulk_insert.shuffle_by_partition' = 'false'.'write.bulk_insert.sort_by_partition' = 'false'.'write.precombine.field' = 'MTIME'.'write.tasks' = '16'.'write.bucket_assign.tasks' = '16'.'write.task.max.size' = ' '.'write.merge.max_memory' = ' '
);
Copy the code

For historical data to be imported into Hudi, you can use offline Bulk_INSERT to import Hudi data to the lake, Load Index Bootstrap, and then retrieve incremental data. The uniqueness of data imported by Bulk_INSERT depends on the data at the source end. When retrieving incremental data, ensure that data is not lost.

Here we choose the easier way of adjusting mission resources and putting historical data into the lake. With Flink’s checkpoint mechanism, whether CDC 2.0 is in Kafka or Kafka is in Hudi, the task can be restarted by checkpoint without data loss.

We can configure CDC 2.0 to Kafka. When Kafka enters THE Hudi task, we can increase the memory size and configure multiple parallelism to speed up the historical data entering the lake. After all the historical data enters the lake, we can reduce the memory size of the lake entry task and set the parallelism of CDC into Kafka to 1. Since the incremental CDC is single parallelism, checkpoint is specified to restart the task.

According to the parameter configuration defined in the table above, 16 parallelism is configured. When the memory size of Flink TaskManager is 50GB, it takes 10 hours to import 1.5 billion historical data into Hudi COW table. It actually took 6 hours to import 900 million data from a single table into Hudi COW table. Of course, a large part of this time is the COW write amplification feature, which consumes a lot of time in upSERT mode with a large amount of data.

At present, our cluster consists of more than 200 machines, and the total number of online stream computing tasks is more than 200, with the total data volume approaching 2PB.

If the cluster resources are limited, you can adjust the MEMORY configuration of Hudi tables and Flink tasks as required. You can also set the Hudi traffic limiting parameter write-.rate. limit to let historical data enter the lake slowly.

In the previous version of Flink CDC 1.x, due to the single parallelism reading in the full Snapshot stage, it took a long time for tables with more than 100 million levels to read in the full Snapshot stage, and checkpoint failure failed to ensure data continuity at breakpoints.

Start a CDC 1.x program to write incremental data to Kafka, then start another SQoop program to pull all current data to Hive, read Hive data through Flink and write Hudi. Finally, Kafka’s incremental data is reconsumed back to Hudi. Because Kafka and Hive data intersect, data is not lost, and Hudi’s upsert capability ensures data uniqueness.

However, such links are too long to operate, and CDC 2.0’s ability to support multiple parallelism in the full Snapshot phase and checkpoint has greatly reduced the complexity of the architecture.

2. Data comparison

  • Because the production environment uses Hive1.x, Hudi does not support data synchronization for 1.x. Therefore, create Hive external tables to query data. For hive2. x and later versions, see Hive Synchronization.

  • Create Hive external tables and pre-create partitions.

  • Add Hudi- hadoop-mr-bundle-0.10.0-snapshot.jar to auxlib folder.

CREATE EXTERNAL TABLE m37_mpay_tj.`ods_sy_payment_f_d_b_ext`(
  `_hoodie_commit_time` string,
  `_hoodie_commit_seqno` string,
  `_hoodie_record_key` string,
  `_hoodie_partition_path` string,
  `_hoodie_file_name` string,
  `ID` bigint,
  ...
  )
PARTITIONED BY (
  `dt` string)
ROW FORMAT SERDE
  'org.apache.hadoop.Hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.Hudi.hadoop.HoodieParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.Hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs:///data/Hudi/m37_mpay_tj/sy_payment'
Copy the code

Query Hudi data (in the form of Hive external table) with the original Hive data synchronization of SQOOP:

  1. The total number is consistent;
  2. The statistics are consistent by day group;
  3. The amount calculated by day group is consistent.

Six, summarized

Compared with the traditional warehouse architecture, the integration of lake warehouse and flow and batch architecture mainly has the following advantages:

  • Hudi provides Upsert capability to address frequent Upsert/Delete pain points;

  • Provide minute-level data, higher timeliness than traditional data warehouse;

  • Based on Flink-SQL, the flow and batch integration is realized, and the code maintenance cost is low.

  • Data homology, same computing engine, same storage, same computing caliber;

  • Flink CDC is selected as the data synchronization tool to save the maintenance cost of SQOOP.

Finally, for the pain point of frequently adding table fields, and we hope that this field can be automatically added when the downstream system is synchronized in the future, there is no perfect solution at present, and we hope that the Flink CDC community can provide Schema Evolution support in the subsequent version.

Reference

[1] MySQL CDC documents: ververica. Making. IO/flink – CDC – c…

[2] Hudi Flink unriddling: www.yuque.com/docs/share/…

Some of [3] Hudi design: www.yuque.com/docs/share/…