Flink1.11 introduced the CDC connector, which makes it easy to capture changing data, greatly simplifying the process of data processing. Flink1.11 CDC connector mainly includes: MySQL CDC and Postgres CDC, and Kafka connector supports ccanal – JSON, Debezium -json and Changelog-json formats. This article mainly shares the following contents:

  • The CDC’s profile
  • Table format provided by Flink
  • Attention points in the process of use
  • Operation practices of mysql-CDC
  • Canal-json operation practices
  • Changelog-json operation practice

Introduction to the

Flink CDC Connector is a set of data source connectors for ApacheFlink that use change data capture (CDC)) ** to extract change data from different databases. The Flink CDC connector integrates Debezium as an engine to capture data changes. Therefore, it can take full advantage of Debezium’s capabilities.

The characteristics of

  • Support for reading database snapshots and the ability to continuously read database change logs, even in the event of a failure, and support for the exactly-once processing semantics

  • With the CDC Connector of the DataStream API, users can use change data on multiple databases and tables in a single job without having to deploy Debezium and Kafka.

  • For the CDC Connector of the Table/SQL API, users can use THE SQL DDL to create a CDC data source to monitor data changes on a single Table.

Usage scenarios

  • Incremental data synchronization between databases
  • The audit log
  • Live materialized view on the database
  • CDC based dimension table JOIN

Table format provided by Flink

Table Formats Flink provides a series of table formats that can be used by table connector.

Formats Supported Connectors
CSV Apache Kafka.Filesystem
JSON Apache Kafka.Filesystem.Elasticsearch
Apache Avro Apache Kafka.Filesystem
Debezium CDC Apache Kafka
Canal CDC Apache Kafka
Apache Parquet Filesystem
Apache ORC Filesystem

Attention points in the process of use

Cautions for using MySQL CDC

If you want to use MySQL CDC Connector, you need to add the following dependencies for your program:

<dependency>
  <groupId>com.alibaba.ververica</groupId>
  <artifactId>flink-connector-mysql-cdc</artifactId>
  <version>1.0.0</version>
</dependency>
Copy the code

To use Flink SQL Client, you need to add the following JAR package: flink-sqL-connector-mysql-cdc-1.0.0. jar. Put the JAR package in the lib folder of the Flink installation directory.

Note the use of canal-json

If you want to use Kafka’s canal-json, you need to add the following dependencies for your program:

<! -- universal -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>Flink - connector - kafka_2. 11</artifactId>
    <version>1.11.0</version>
</dependency>


Copy the code

To use Flink SQL Client, you need to add the following JAR package: flink-sqL-connector-kafka_2.11-1.11.0. jar. Put the JAR package in the lib folder of the Flink installation directory. Flink1.11 installation package does not provide the jar package in the lib directory, so you must manually add the dependency package, otherwise the following error will be reported:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.

Available factory identifiers are:

datagen
mysql-cdc
Copy the code

Precautions for using Changelog-json

If you want to use Kafka’s Changelog-json Format, you need to add the following dependencies for your application:

<dependency>
  <groupId>com.alibaba.ververica</groupId>
  <artifactId>flink-format-changelog-json</artifactId>
  <version>1.0.0</version>
</dependency>
Copy the code

To use Flink SQL Client, add the following JAR package: flink-format-changelog-json-1.0.0.jar. Put the JAR package in the lib folder of the Flink installation directory.

Operation practices of mysql-CDC

Create MySQL data source table

MySQL > create table CDC;

-- MySQL
/*Table structure for table `order_info` */
DROP TABLE IF EXISTS `order_info`;
CREATE TABLE `order_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'number',
  `consignee` varchar(100) DEFAULT NULL COMMENT 'Consignee',
  `consignee_tel` varchar(20) DEFAULT NULL COMMENT 'Recipient phone',
  `total_amount` decimal(10.2) DEFAULT NULL COMMENT 'Total amount',
  `order_status` varchar(20) DEFAULT NULL COMMENT 'Order status,1 for order, 2 for payment',
  `user_id` bigint(20) DEFAULT NULL COMMENT 'user id',
  `payment_way` varchar(20) DEFAULT NULL COMMENT 'Payment Method',
  `delivery_address` varchar(1000) DEFAULT NULL COMMENT 'Shipping address',
  `order_comment` varchar(200) DEFAULT NULL COMMENT 'Order Notes',
  `out_trade_no` varchar(50) DEFAULT NULL COMMENT 'Order Transaction Number (for third-party payment)',
  `trade_body` varchar(200) DEFAULT NULL COMMENT 'Order Description (for third-party payment)',
  `create_time` datetime DEFAULT NULL COMMENT 'Creation time',
  `operate_time` datetime DEFAULT NULL COMMENT 'Operation time',
  `expire_time` datetime DEFAULT NULL COMMENT 'Dead time',
  `tracking_no` varchar(100) DEFAULT NULL COMMENT 'Logistics Number',
  `parent_order_id` bigint(20) DEFAULT NULL COMMENT 'Parent Order Number',
  `img_url` varchar(200) DEFAULT NULL COMMENT 'Picture path',
  `province_id` int(20) DEFAULT NULL COMMENT 'region'.PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='Order sheet';
-- ----------------------------
-- Records of order_info
-- ----------------------------
INSERT INTO `order_info` 
VALUES (476.'lAXjcL'.'13408115089'.433.00.'2'.10.'2'.'OYyAdSdLxedceqovndCD'.'ihjAYsSjrgJMQVdFQnSy'.'8728720206'.' '.'the 2020-06-18 02:21:38'.NULL.NULL.NULL.NULL.NULL.9);
INSERT INTO `order_info`
VALUES (477.'QLiFDb'.'13415139984'.772.00.'1'.90.'2'.'OizYrQbKuWvrvdfpkeSZ'.'wiBhhqhMndCCgXwmWVQq'.'1679381473'.' '.'the 2020-06-18 09:12:25'.NULL.NULL.NULL.NULL.NULL.3);
INSERT INTO `order_info`
VALUES (478.'iwKjQD'.'13320383859'.88.00.'1'.107.'1'.'cbXLKtNHWOcWzJVBWdAs'.'njjsnknHxsxhuCCeNDDi'.'0937074290'.' '.'the 2020-06-18 15:56:34'.NULL.NULL.NULL.NULL.NULL.7);

/*Table structure for table `order_detail` */
CREATE TABLE `order_detail` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'number',
  `order_id` bigint(20) DEFAULT NULL COMMENT 'Order Number',
  `sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id',
  `sku_name` varchar(200) DEFAULT NULL COMMENT 'SKU name (redundant)',
  `img_url` varchar(200) DEFAULT NULL COMMENT 'Image name (redundant)',
  `order_price` decimal(10.2) DEFAULT NULL COMMENT 'Purchase Price (SKU price at time of order)',
  `sku_num` varchar(200) DEFAULT NULL COMMENT 'Number of purchases',
  `create_time` datetime DEFAULT NULL COMMENT 'Creation time'.PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='Order List';

-- ----------------------------
-- Records of order_detail
-- ----------------------------
INSERT INTO `order_detail` 
VALUES (1329.476.8.'Apple iPhone XS Max (A2104) 256GB.'http://XLMByOyZDTJQYxphQHNTgYAFzJJCKTmCbzvEJIpz'.8900.00.'3'.'the 2020-06-18 02:21:38');
INSERT INTO `order_detail` 
VALUES (1330.477.9.'Glory 10 GT Game Acceleration AIS Handheld Nightview 6GB+64GB Phantom Blue Full Netcom Mobile Unicom Telecom'.'http://ixOCtlYmlxEEgUfPLiLdjMftzrleOEIBKSjrhMne'.2452.00.'4'.'the 2020-06-18 09:12:25');
INSERT INTO `order_detail`
VALUES (1331.478.4.'Xiaomi Play Light Gradient AI Dual Photography 4GB+64GB Dream Blue Full Netcom 4G Dual card dual waiting Small water drop full screen photo game smartphone'.'http://RqfEFnAOqnqRnNZLFRvBuwXxwNBtptYJCILDKQYv'.1442.00.'1'.'the 2020-06-18 15:56:34');
INSERT INTO `order_detail` 
VALUES (1332.478.8.'Apple iPhone XS Max (A2104) 256GB.'http://IwhuCDlsiLenfKjPzbJrIoxswdfofKhJLMzlJAKV'.8900.00.'3'.'the 2020-06-18 15:56:34');
INSERT INTO `order_detail` 
VALUES (1333.478.8.'Apple iPhone XS Max (A2104) 256GB.'http://bbfwTbAzTWapywODzOtDJMJUEqNTeRTUQuCDkqXP'.8900.00.'1'.'the 2020-06-18 15:56:34');
Copy the code

Flink SQL Cli Create CDC data source

Start the Flink cluster, then start the SQL CLI, and run the following command:

Create order information table
CREATE TABLE order_info(
    id BIGINT,
    user_id BIGINT,
    create_time TIMESTAMP(0),
    operate_time TIMESTAMP(0),
    province_id INT,
    order_status STRING,
    total_amount DECIMAL(10.5))WITH (
    'connector' = 'mysql-cdc'.'hostname' = 'kms-1'.'port' = '3306'.'username' = 'root'.'password' = '123qwe'.'database-name' = 'mydw'.'table-name' = 'order_info'
);
Copy the code

Query the data of this table in the Flink SQL Cli: result-mode: tableau, + indicates the insert of data

Create order detail table in SQL CLI:

CREATE TABLE order_detail(
    id BIGINT,
    order_id BIGINT,
    sku_id BIGINT,
    sku_name STRING,
    sku_num BIGINT,
    order_price DECIMAL(10.5),
	create_time TIMESTAMP(0))WITH (
    'connector' = 'mysql-cdc'.'hostname' = 'kms-1'.'port' = '3306'.'username' = 'root'.'password' = '123qwe'.'database-name' = 'mydw'.'table-name' = 'order_detail'
);
Copy the code

The query results are as follows:

Perform JOIN:

SELECT
    od.id,
    oi.id order_id,
    oi.user_id,
    oi.province_id,
    od.sku_id,
    od.sku_name,
    od.sku_num,
    od.order_price,
    oi.create_time,
    oi.operate_time
FROM
   (
    SELECT * 
    FROM order_info
    WHERE 
	     order_status = '2'Paid -
   ) oi
   JOIN
  (
    SELECT *
    FROM order_detail
  ) od 
  ON oi.id = od.order_id;
Copy the code

Canal-json operation practices

As for cannal usage, you can refer to my other article: Real-time incremental Data Synchronization based on Canal and Flink (I). I have synchronized the following tables to Kafka via Canal in the format:

{
    "data":[
        {
            "id":"1"."region_name":"North China"
        },
        {
            "id":"2"."region_name":"East"
        },
        {
            "id":"3"."region_name":"Northeast"
        },
        {
            "id":"4"."region_name":"Central China"
        },
        {
            "id":"5"."region_name":"South China"
        },
        {
            "id":"6"."region_name":"Southwest"
        },
        {
            "id":"Seven"."region_name":"Northwest"}]."database":"mydw"."es": 1597128441000,"id": 102,"isDdl":false."mysqlType": {"id":"varchar(20)"."region_name":"varchar(20)"
    },
    "old":null,
    "pkNames":null,
    "sql":""."sqlType": {"id": 12."region_name":12
    },
    "table":"base_region"."ts": 1597128441424,"type":"INSERT"
}
Copy the code

Create the canal-JSON table in SQL CLI:

CREATE TABLE region (
  id BIGINT,
  region_name STRING
) WITH (
 'connector' = 'kafka'.'topic' = 'mydw.base_region'.'properties.bootstrap.servers' = 'kms-3:9092'.'properties.group.id' = 'testGroup'.'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
);
Copy the code

The query results are as follows:

Changelog-json operation practice

Create MySQL data source

See order_info above

Flink SQL Cli Create a Changelog-JSON table

CREATE TABLE order_gmv2kafka (
  day_str STRING,
  gmv DECIMAL(10.5))WITH (
    'connector' = 'kafka'.'topic' = 'order_gmv_kafka'.'scan.startup.mode' = 'earliest-offset'.'properties.bootstrap.servers' = 'kms-3:9092'.'format' = 'changelog-json'
);

INSERT INTO order_gmv2kafka
SELECT DATE_FORMAT(create_time, 'yyyy-MM-dd') as day_str, SUM(total_amount) as gmv
FROM order_info
WHERE order_status = '2' B: The order has been paid
GROUP BY DATE_FORMAT(create_time, 'yyyy-MM-dd'); 
Copy the code

Query table to see the result:

Take a look at kafka again:

{"data": {"day_str":"2020-06-18"."gmv": 433}."op":"+I"}
Copy the code

When the status order_status of the other two orders is updated to 2, the total amount =443+772+88=1293 then observe the data:

Take a look at the data in Kafka:

conclusion

Based on THE SQL of Flink1.11, this paper describes the usage of the newly added CDC Connector. It mainly includes the format of MySQL CDC Connector, Canal-JSON and Changelog-json, and points out the attention in the use process. In addition, this article provides a complete use of the sample, if you have a ready-made environment, you can directly test the use.

The public account “Big Data Technology and Data Warehouse”, reply to “information” to receive the big data data package