This is the sixth day of my participation in the November Gwen Challenge. See details: The Last Gwen Challenge 2021.

1. What is CDC

CDC stands for Change Data Capture. The idea is to monitor and capture changes to the database (including inserts, updates, and deletions of data or tables), and to fully record these changes in the order in which they occurred and write them to the messaging middleware for subscription and consumption by other services.

In a broad sense, any technology that captures changes in data can be called a CDC. CDC technology is usually referred to as database-oriented change, a technique for capturing changes to data in a database.

CDC technology has a wide range of application scenarios:

  • Data synchronization for backup and disaster recovery;

  • Data distribution, where one data source is distributed to multiple downstreams;

  • Data acquisition (E), ETL data integration for data warehouse/data lake.

2. Types of CDC

CDC is divided into two types: query-based and binlog-based. Let’s take a look at the differences between the two types:

Query-based CDC Binlog-based CDC
Open source products Sqoop, Kafka JDBC Source Canal, Maxwell, Debezium
Execution mode Batch Streaming
Whether all data changes can be captured no is
delayed High latency Low latency
Whether to increase database stress is no

3. Comparison between traditional CDC and Flink CDC

1) Traditional CDC ETL analysis

2) ETL analysis based on Flink CDC

2) Aggregation analysis based on Flink CDC

2) Based on the data of Flink CDC

4. Flink – CDC case

The Flink community developed the Flink-CDC-Connectors component, a source component that can read full and incremental change data directly from databases such as MySQL and PostgreSQL.

Open source: github.com/ververica/f…

Sample code:

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;

public class FlinkCDC {
 public static void main(String[] args) throws Exception {
 //1. Create an execution environment
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);
 //2. Flink-cdc will read the binlog location information as state in CK, if you want to do breakpointContinuation, need fromCheckpointorSavepointStart the program2.1 Start Checkpoint and perform CK every 5 seconds
 env.enableCheckpointing(5000L);
 //2.2 Specify CK consistency semantics
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
 //2.3 The last CK data is saved when the task is closed
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckp
ointCleanup.RETAIN_ON_CANCELLATION);
 //2.4 Specify the automatic restart policy from CK
 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3.2000L));
 //2.5 Set the state backend
 env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));
 //2.6 Setting the user name for accessing the HDFS
 System.setProperty("HADOOP_USER_NAME"."atguigu");
 // create flink-mysql-cdc Source
 //initial (default): Performs an initial snapshot on the monitored database tables upon 
first startup, and continue to read the latest binlog.
 //latest-offset: Never to perform snapshot on the monitored database tables upon first 
startup, just read from the end of the binlog which means only have the changes since the 
connector was started.
 //timestamp: Never to perform snapshot on the monitored database tables upon first 
startup, and directly read binlog from the specified timestamp. The consumer will traverse the 
binlog from the beginning and ignore change events whose timestamp is smaller than the 
specified timestamp.
 //specific-offset: Never to perform snapshot on the monitored database tables upon 
first startup, and directly read binlog from the specified offset.
 DebeziumSourceFunction<String> mysqlSource = MySQLSource. <String>builder()
 .hostname("hadoop01")
 .port(3306)
 .username("root")
 .password("000000")
 .databaseList("gmall-flink")
 .tableList("gmall-flink.z_user_info") // This parameter is optional. If this parameter is not specified, yesRead data from all tables in the previous configuration. Note: this parameter is required when specified"db.table"Way. StartupOptions (StartupOptions.initial())
 .deserializer(new StringDebeziumDeserializationSchema())
 .build();
 //4. Read data from MySQL using CDC Source
 DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);
 Print the data
 mysqlDS.print();
 //6. Execute tasksenv.execute(); }}Copy the code

5. Flink SQL cases

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkSQL_CDC {
 public static void main(String[] args) throws Exception {
 //1. Create an execution environment
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);
 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
 // create flink-mysql-cdc Source
 tableEnv.executeSql("CREATE TABLE user_info (" +
 " id INT," +
 " name STRING," +
 " phone_num STRING" +
 ") WITH (" +
 " 'connector' = 'mysql-cdc'," +
 " 'hostname' = 'hadoop01'," +
 " 'port' = '3306'," +
 " 'username' = 'root'," +
 " 'password' = '000000'," +
 " 'database-name' = 'gmall-flink'," +
 " 'table-name' = 'z_user_info'" +
 ")");
 tableEnv.executeSql("select * from user_info").print(); env.execute(); }}Copy the code

6. Real-time data warehouse of Lambda architecture

The concept of Lambda and Kappa architecture has been explained in the previous article. If you are not familiar with it, you can click on the link to read real-time computing of big Data

The following figure shows the specific practice of Lambda architecture based on Flink and Kafka. The upper layer is real-time computing, the lower layer is offline computing, the horizontal layer is divided by computing engine, and the vertical layer is divided by real-time warehouse:

Lambda architecture is a classic architecture. In the past, there were not many real-time scenes, which were mainly offline. When the real-time scenes were added, the technology ecology was different due to the different timeliness of offline and real-time scenes. Lambda architecture is equivalent to the addition of a real-time production link, an integration at the application level, dual production, independent. This is also a logical approach to adopt in business applications.

Dual-path production will have some problems, such as processing logic double, development operation and maintenance will also double, resources will also become two resource links. Because of the above problems, another Kappa architecture was evolved.

7. Real-time data warehouse of Kappa architecture

The Kappa architecture is equivalent to Lambda architecture without the offline computing part, as shown in the figure below:

Kappa architecture is relatively simple in terms of architecture design, with unified production and a set of logic for simultaneous offline and real-time production. However, it has great limitations in practical application scenarios, because the same table of real-time data will be stored in different ways, which leads to the need for cross-data sources in association, and the operation of data has great limitations. Therefore, there are few cases that directly use Kappa architecture for production and implementation in the industry, and the scenarios are relatively single.

For those of you familiar with real-time warehouse production, there may be a question about the Kappa architecture. Because we are constantly faced with business changes, much business logic needs to be iterated. Some data produced before, if the caliber is changed, need to recalculate, or even brush historical data. For real-time data warehouse, how to solve the data recalculation problem?

The idea behind the Kappa architecture in this section is to have a message queue, such as Kafka, that can store historical data and allow you to restart consumption from a historical node. Then you need to start a new task to consume Kafka data from an earlier point in time, and when the new task is running at the same pace as the current running task, you can switch the current task downstream to the new task, and the old task can stop. And the original output table can also be deleted.

8. Real-time data warehouse combined with flow batch

With the development of real-time OLAP technology, the current open source OLAP engine has been greatly improved in performance and ease of use, such as Doris, Presto, etc. Coupled with the rapid development of data lake technology, the way of streaming batch combination becomes simple.

The figure below is the real-time data warehouse combined with flow batch:

Data is collected uniformly from log to message queue, and then to real-time data warehouse, as the construction of basic data flow is unified. Later, for real-time features of log class, real-time large-screen applications go through real-time streaming computing. For Binlog class business analysis go to real-time OLAP batch processing.

We see flow batch of combination of the above and several architecture of storage mode has changed, changed by Kafka Iceberg, Iceberg is between the upper calculation engine and an intermediate layer between the underlying storage format, we can define it as a “data organization format”, the underlying storage or HDFS, so why is the middle layer, Is it better to combine convection batch processing? Iceberg’s ACID ability can simplify the design of the entire assembly line, reduce the delay of the entire assembly line, and its modification and deletion ability can effectively reduce overhead and improve efficiency. Iceberg can effectively support batch high-throughput data scanning and concurrent real-time processing of stream computation according to partition granularity.