This article is compiled by Chen Zhengyu, a community volunteer. The content is from “Detailed Explanation of Flink-CDC” shared by Xu Bang Jiang, a senior development engineer of Alibaba, at The Flink Meetup in Beijing on July 10. The core features of the latest Flink CDC 2.0.0 release are discussed in depth, including: concurrent read of full data, checkpoint, lockless read and other major improvements.

I. CDC overview

CDC stands for Change Data Capture, which is a broad term for any technology that captures changes in Data. The CDC technology currently described is mainly for database change and is a technology for capturing data changes in the database. CDC technology can be used in a wide range of scenarios:

  • ** Data synchronization: ** Used for backup and disaster recovery;
  • ** Data distribution: ** One data source is distributed to multiple downstream systems;
  • ** Data acquisition: ** ETL data integration for data warehouse/data lake, is a very important data source.

There are many technical solutions for CDC. The current mainstream implementation mechanisms in the industry can be divided into two types:

  • Query-based CDC:

    • Offline scheduling of query jobs, batch processing. Synchronize a table to another system, fetching the latest data from the table by querying it each time.

    • Data consistency cannot be guaranteed. Data may have changed several times during the search.

    • Real-time is not guaranteed, and there is natural delay based on offline scheduling.

  • Log based CDC:

    • Real-time consumption log, stream processing, for example, MySQL binlog log completely records changes in the database, you can use the binlog file as the stream data source;
    • Data consistency is ensured because the binlog file contains all the historical change details;
    • Real-time data is guaranteed, because log files like binlog are available for streaming consumption, providing real-time data.

Compared with common open source CDC solutions, we can find that:

  • Compared to incremental synchronization capability,
    • Incremental synchronization can be achieved in log – based mode.
    • Incremental synchronization is difficult to achieve in a query-based approach.
  • Compared to full synchronization capability, query – or log-based CDC schemes are generally supported, except for Canal.
  • Compared with the full + incremental synchronization ability, only Flink CDC, Debezium, Oracle Goldengate support better.
  • From the perspective of architecture, this table divides the architecture into stand-alone and distributed. Distributed architecture here is not only reflected in the horizontal expansion of data reading capability, but more importantly, the access capability of distributed system in big data scenarios. For example, when the data of Flink CDC enters the lake or warehouse, the downstream is usually a distributed system, such as Hive, HDFS, Iceberg, Hudi, etc. In terms of the ability to access the distributed system, the architecture of Flink CDC can be well connected to such systems.
  • In terms of data conversion/data cleaning capabilities, is it easier to filter or clean, or even aggregate data as it enters the CDC tool?
    • It is fairly simple to operate on Flink CDC, and you can manipulate this data through Flink SQL;
    • However, DataX, Debezium and others need to be done through scripts or templates, so the user’s threshold of use will be relatively high.
  • In addition, in terms of ecology, this refers to some downstream database or data source support. Flink CDC has a variety of downstream connectors, such as TiDB, MySQL, Pg, HBase, Kafka, ClickHouse and other common systems, and also supports a variety of custom connectors.

2. Flink CDC project

With that said, let’s review the motivation behind the Flink CDC project.

1. Dynamic Table & ChangeLog Stream

It is well known that Flink has two basic concepts: Dynamic Table and Changelog Stream.

  • Dynamic Table is a Dynamic Table defined by Flink SQL. The concept of Dynamic Table and flow is equivalent. As shown in the figure above, flows can be transformed into dynamic tables, and dynamic tables can be transformed into flows.
  • In Flink SQL, data flows from one operator to another in the form of Changelog Stream, Changelog Stream at any time can be translated into a table, can be translated into a Stream.

If you think about tables and binlog logs in MySQL, you will find: All changes to a table in MySQL database are recorded in the binlog log. If the table is updated all the time, the binlog log flow will always be appending. The table in the database is equivalent to the materialized result of the Binlog log flow at a certain point in time. The log flow is the result of continuously capturing table change data. This shows that Flink SQL’s Dynamic Table is a very natural representation of a changing MySQL database Table.

On this basis, we investigated some CDC technologies and finally selected Debezium as the underlying collection tool for Flink CDC. Debezium supports full synchronization, incremental synchronization, and full + incremental synchronization, which is very flexible, while log-based CDC technology makes it possible to provide Exactly Once.

Comparing Flink SQL’s internal data structure RowData to Debezium’s data structure shows that they are very similar.

  • Each RowData has a metadata RowKind, consisting of four types: INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE. These four types are consistent with the binlog concept in the database.
  • The data structure of Debezium also has a similar metadata op field, which has four values, namely C, U, D and r, corresponding to create, Update, DELETE and read respectively. For u, which represents the update operation, the data portion contains both before and after images.

Through the analysis of the two data structures, the underlying data of Flink and Debezium can be easily connected. You can find that Flink is technically very suitable to do CDC.

2. Traditional CDC ETL analysis

Let’s take a look at the traditional CDC ETL analysis link, as shown below:

In traditional CDC-based ETL analysis, data collection tools are necessary. Foreign users often use Debezium, and domestic users often use Canal, which is open source of Alibaba. The collection tool is responsible for collecting incremental data of the database, and some collection tools also support synchronization of full data. The collected data is generally output to the messaging middleware such as Kafka, and then the Flink computing engine consumes this part of the data and writes it to the destination, which can be a variety of DB, data lake, real-time storage and offline storage.

Note that Flink provides the Changelog-JSON format to write Changelog data to offline repositories such as Hive/HDFS. For real-time storage, Flink supports writing Changelog directly to Kafka via the Upsert-Kafka connector.

We have been wondering if we could use Flink CDC to replace the collection component and message queue in the dotted box above to simplify the analysis link and reduce maintenance costs. And fewer components means that data timeliness can be improved. The answer is yes, which leads to our Flink CDC-based ETL analysis process.

3. ETL analysis based on Flink CDC

In addition to having fewer components and easier maintenance, another advantage of using Flink CDC is that the barriers to entry to use Flink SQL are greatly reduced. See the following example:

This example uses Flink CDC to synchronize database data and write to TiDB. Users directly use Flink SQL to create the product and order mysql-CDC table, and then JOIN the data flow processing, processing directly to the downstream database. CDC data is analyzed, processed, and synchronized with a single Flink SQL job.

You will notice that this is a pure SQL job, which means that as long as the BI of SQL, business line students can complete this kind of work. At the same time, users can also take advantage of the rich syntax provided by Flink SQL for data cleaning, analysis, and aggregation.

These capabilities are difficult for existing CDC protocols to clean, analyze, and aggregate data.

In addition, Flink SQL dual-stream JOIN, dimension table JOIN, UDTF syntax can be very easy to complete data widening, as well as various business logic processing.

4. Flink CDC project development

  • In July 2020, Yun Xie submitted the first COMMIT, which was based on personal interest incubation project;

  • Support for mysql-CDC in mid-July 2020;

  • Postgres-cdc supported in late July 2020;

  • Within a year, the project had more than 800 Stars on GitHub.

Three, Flink CDC 2.0 detailed explanation

1. Flink CDC pain point

The MySQL CDC is the most important Connector used in Flink CDC. The following sections describe all Flink CDC connectors as MySQL CDC connectors.

With the development of the Flink CDC project, we have received a lot of feedback from users in the community, which can be summarized as follows:

  • The full + incremental read process needs to ensure the consistency of all data, so it needs to be guaranteed by locking, but locking is a very high-risk operation at the database level. When the underlying Debezium guarantees data consistency, it needs to lock the library or table read. Global lock may cause database lock, and table-level lock will lock table read, so DBAs generally do not give lock permission.
  • Horizontal scale-out is not supported because the underlying Flink CDC is based on Debezium and the starting architecture is single node, so Flink CDC only supports single concurrency. In the full phase read phase, if the table is very large (100 million level) and the read time is at the hour or even day level, the user cannot increase the speed of the job by adding resources.
  • The full read phase does not support checkpoint: CDC reads are divided into two phases: full read and incremental read. At present, the full read phase does not support checkpoint. When we synchronize the full amount of data, let’s say it takes 5 hours. When we synchronize for 4 hours and the job fails, we need to restart and read for another 5 hours.

2. Debezium lock analysis

Flink CDC encapsulates Debezium, Debezium synchronizes a table into two stages:

  • ** Select ** from table; ** Select ** from table;
  • ** Incremental phase: ** Consume change data from binlog.

Most users use the full + incremental synchronization scenario. Locking occurs in the full synchronization phase. The purpose of locking is to determine the initial point of the full synchronization phase and ensure that one of the incremental + full synchronization is small and the other is small, so as to ensure data consistency. The red line on the left is the life cycle of the lock, and on the right is the life cycle of the MySQL open repeatable read transaction.

In the case of a global lock, the first step is to acquire a lock and then to open a repeatable read transaction. The lock operation here is to read the starting position of the binlog and the schema of the current table. The purpose of this is to ensure that the starting position of the binlog corresponds to the current schema read, because the schema of the table can change, such as deleting or adding columns. After reading these two messages, SnapshotReader will read the full data in the repeatable read transaction. After the full data is read, the BinlogReader will start the incremental data from the starting position of the binlog to ensure the seamless connection between the full data and the incremental data.

Table locks are a regressive version of global locks because global locks have higher permissions, so in some scenarios, users only have table locks. Table locks take longer because of a feature of table locks: Transactions that release repeatable reads before the lock is released are committed by default, so the lock is not released until the full amount of data has been read.

With that in mind, here’s how serious these locks can be:

Flink CDC 1.x can be unlocked and can be used in most scenarios, but at the expense of some data accuracy. Flink CDC 1.x uses global lock by default. Although it can ensure data consistency, it has the risk of hanging data.

3. Flink CDC 2.0 Design (take MySQL as an example)

Through the above analysis, we can know that the design of 2.0, the core to solve the three problems mentioned above, namely support lock – free, horizontal expansion, checkpoint.

The lock-free algorithm described in the DBlog paper is shown below:

On the left is the description of the Chunk sharding algorithm. The Chunk sharding algorithm is similar to the principle of many databases. It shards data in a table by the primary key of the table. Assuming that the step size of each Chunk is 10, you only need to divide the chunks into left-open, right-closed, or left-closed, right-open intervals, and ensure that the joined intervals are equal to the primary key intervals of the table.

On the right is the description of the lock free read algorithm for each Chunk. The core idea of the lock free read algorithm is to merge the full and incremental reads of each Chunk without locking. The following figure shows the splicing of chunks:

Since each chunk is only responsible for the data within its own primary key range, it is not difficult to deduce that as long as the consistency of the reads of each chunk can be guaranteed, the consistency of the entire table can be guaranteed. This is the basic principle of the lock-free algorithm.

In the Netflix DBLog paper, the Chunk read algorithm is implemented by maintaining a table of signals in the DB, and using the table of signals in the binlog file, Record the Low Position before the data is read and the High Position after the data is read. Query the full data of the chunk between the Low Position and High Position. After the data of the Chunk is read, the incremental binlog data between the two sites is merged into the full data of the Chunk to obtain the full data of the Chunk at the high point.

Flink CDC combined with its own situation, the Chunk read algorithm has been improved to remove the signal table, without additional maintenance of the signal table, by directly reading the binlog site instead of marking in the binlog function, the overall Chunk read algorithm is described as follows:

For example, if we are reading chunk-1 and the Chunk range is [K1, K10], select the data from the Chunk range and store it in the buffer. A loci (high point) of the binlog is recorded after the select is complete. Then we start the incremental part, consuming the binlog from the low point to the high point.

  • The record – (k2,100) + (k2,108) in the figure indicates that the value of this data is updated from 100 to 108;
  • The second record is to delete K3;
  • The third record is to update k2 to 119;
  • The fourth record is that the data of K5 is changed from the original 77 to 100.

Looking at the final output in the bottom right corner of the image, we can see that when the binlog of the chunk is consumed, there are keys K2, k3, and k5. We go to the buffer and mark these keys.

  • For K1, K4, K6, and K7, these records do not change after the high point is read, so these data can be directly output;

  • For the changed data, the incremental data needs to be merged into the full data, and only the final data after the merger is retained. For example, if the final result of K2 is 119, then you only need to output +(k2,119), without the data that has changed in the middle.

In this way, the final output of the Chunk is the most recent data in the Chunk at the high point.

The figure above shows consistent reads of a single Chunk. However, if there are multiple tables that have multiple chunks and these chunks are distributed to different tasks, how do you distribute the chunks and ensure global consistent reads?

The component with the SourceEnumerator component is used to divide chunks. The Chunk is provided to the downstream SourceReader to read. The process of concurrently reading Snapshot chunks is realized by distributing the chunks to different SourceReader. Meanwhile, based on flip-27, we can easily achieve the checkpoint of chunk granularity.

After Snapshot Chunk is read, the Snapshot Chunk completion information needs to be reported to the SourceEnumerator, as shown in the following figure.

The main purpose of reporting is for the subsequent distribution of Binlog Chunk (see figure below). Since Flink CDC supports full + incremental synchronization, incremental binlogs need to be consumed after all Snapshot chunks have been read. This is done by delivering a binlog chunk to any Source Reader for a single concurrent read.

For most users, you don’t need to pay too much attention to the details of the lock-free algorithm and sharding, just understand the overall process.

The whole process can be summarized as follows: firstly, Snapshot Chunk is divided into tables by primary key, and then Snapshot Chunk is distributed to multiple SourceReader. When each Snapshot Chunk is read, consistent reads in the condition of no lock are realized by the algorithm. The SourceReader checkpoint supports the chunk granularity when reading data. After all Snapshot chunks are read, the SourceReader sends a binlog chunk to read incremental binlogs. This is the overall process for Flink CDC 2.0, as shown below:

Flink CDC is a fully open source project. All the design and source code of the project have now been contributed to the open source community. Flink CDC 2.0 has been officially released.

  • Provides MySQL CDC 2.0. Core features include
    • Concurrent read, full data read performance can be expanded horizontally;
    • There is no lock in the whole process, so there is no risk of lock in online business.
    • Resumable upload: supports checkpoint in the full phase.
  • Build the document website, provide multi-version document support, document support keyword search

The author uses the CUSTOMER table in the TPC-DS data set to conduct the test. Flink version is 1.13.1, the data volume of customer table is 65 million, the Source concurrency is 8, and the full read phase:

  • MySQL CDC 2.0 took 13 minutes;

  • MySQL CDC 1.4 took 89 minutes;

  • The read performance is increased by 6.8 times.

To provide better documentation support, the Flink CDC community has set up a documentation website that supports versioning of documents:

The document website supports keyword search, which is very useful:

Fourth, future planning

As for the future planning of CDC project, we hope to focus on stability, advanced feature and ecological integration.

  • The stability of

    • By attracting more developers through a community approach, the company’s open source efforts are increasing the maturity of Flink CDC;
    • Supports Lazy transactions. Lazy transactions the idea is to divide chunks into batches first, rather than dividing them all at once. For example, if a Source Reader has 10,000 chunks, it can partition 1,000 chunks first instead of dividing all the chunks at once. After the SourceReader reads 1000 chunks, the partition continues to save the time of chunk partitioning.
  • Advanced Feature

    • Schema Evolution is supported. In this scenario, when a database is being synchronized, a field is suddenly added to the table, and you expect it to be automatically added to the table when the downstream system is synchronized.
    • Support Watermark Pushdown To obtain some heartbeat information through the CDC binlog. The heartbeat information can be used as a Watermark, and the progress of the current consumption of the stream can be known through the heartbeat information.
    • Support META data, in the scenario of dividing database and table, it may need metadata to know which database and which table the data comes from, which can have more flexible operation in the downstream system into the lake and warehousing;
    • Whole library synchronization: Instead of defining DDL and Query per table, users can synchronize the entire database with a single row of SQL syntax.
  • The ecological integration

    • Integrate more upstream databases, such as Oracle, MS SqlServer. Cloudera is currently actively contributing to the Oracle-CDC Connector;
    • At the lake entry level, there is some optimization space for Hudi and Iceberg writing. For example, when high QPS enters the lake, data distribution has a relatively large impact on performance, which can be further optimized by opening and integrating with ecology.

The appendix

[1] Flink-CDC Project address

[2] Flink-CDC Documentation Website

[3] Percona – MySQL global lock time analysis

[4] [C]

[5] Flink Flip-27 Design document