Abstract: CDL is a simple and efficient real-time Data integration service, which can capture Data Change events from various OLTP databases and push them to Kafka. Finally, the Data from Sink Connector consumption Topic is imported into the big Data ecological software application, so as to realize real-time Data into the lake.

This document is shared with Huawei FusionInsight MRS CDL User Guide by Jin Hongqing.

CDL is a simple and efficient real-time Data integration service, which can capture Data Change events from various OLTP databases and push them to Kafka. Finally, the Data in Sink Connector consumption Topic is imported into big Data ecological software applications, so as to realize real-time Data into the lake.

CDL services contain two important roles: CDLConnector and CDLService. CDLConnector is the instance that performs the specific data fetching task, and CDLService is the instance that manages and creates the task.

This practice describes how to use mysql as the data source for data fetching.

The premise condition

  • The CDL service has been installed on the MRS cluster.

  • The MySQL database needs to enable the MySQL bin log function (enabled by default).

Check whether bin log is enabled for MySQL.

Use a tool or command line to connect to the MySQL database (navicat is used in this example) and run the show variables like ‘log_%’ command to view the variables.

For example, in navicat, choose File> New Query to create a Query, Run the following SQL command, and click Run. If log_bin is displayed as ON in the result, the Query is successfully enabled.

show variableslike ‘log_%’

Tools to prepare

Now CDL can only submit commands using rest apis, so you need to install tools in advance for debugging. This article uses VSCode tools.

Install the REST Client plug-in:

When you’re done, create a cdl.http file for editing:

Create a CDL task

The CDL task creation flowchart is as follows:

Note: you need to create a MySQL link, then create a Kafka link, then create a CDL synchronization task and start.

MySQL Link part of rest request code

@hostname = 172.16.9.113 @port = 21495 @host = {{hostname}}:{{port}} @bootstrap = "172.16.9.113:21007" @bootstrap_normal @mysql_host = "172.16.2.118" @mysql_port = "3306" @mysql_database = "hudi" @mysql_user = "root" @mysql_password = "Huawei@123" ### get links get https://{{host}}/api/v1/cdl/link ### mysql link validate post https://{{host}}/api/v1/cdl/link? validate=true content-type: application/json { "name": "MySQL_link", //link name, globally unique, Cannot repeat "description":"MySQL connection", //link description" link-type":" MySQL ", // Link type" enabled":"true", "link-config-values": {" inputs ": [{" name" : "host", "value" : {{mysql_host}}}, / / database installation node IP {" name ":" port ", "value" : {{mysql_port}}}, / / database to monitor port {" name ":" database. The name ", "value" : {{mysql_database}}}, / / connect the database name {" name ": "User", "value" : {{mysql_user}}}, {/ / users "name" : "password", "value" : {{mysql_password}}}, {/ / password "name" : "schema", "value" : {{mysql_database}}} ### mysql link create post https://{{host}}/api/v1/cdl/link content-type: application/json { "name": "MySQL_link", //link name, globally unique, Cannot repeat "description":"MySQL connection", //link description" link-type":" MySQL ", // Link type" enabled":"true", "link-config-values": {" inputs ": [{" name" : "host", "value" : {{mysql_host}}}, / / database installation node IP {" name ":" port ", "value" : {{mysql_port}}}, / / database to monitor port {" name ":" database. The name ", "value" : {{mysql_database}}}, / / connect the database name {" name ": "User", "value" : {{mysql_user}}}, {/ / users "name" : "password", "value" : {{mysql_password}}}, {/ / password "name" : "schema", "value" : {{mysql_database}}} ### mysql link update put https://{{host}}/api/v1/cdl/link/MySQL_link content-type: application/json { "name": "MySQL_link", //link name, globally unique, Cannot repeat "description":"MySQL connection", //link description" link-type":" MySQL ", // Link type" enabled":"true", "link-config-values": {" inputs ": [{" name" : "host", "value" : {{mysql_host}}}, / / database installation node IP {" name ":" port ", "value" : {{mysql_port}}}, / / database to monitor port {" name ":" database. The name ", "value" : {{mysql_database}}}, / / connect the database name {" name ": "User", "value" : {{mysql_user}}}, {/ / users "name" : "password", "value" : {{mysql_password}}}, / / password {" name ":" schema ", "value" : {{mysql_database}}} with the database name] / /}}Copy the code

Kafka Link part of the REST request code

### get links get https://{{host}}/api/v1/cdl/link ### kafka link validate post https://{{host}}/api/v1/cdl/link? validate=true content-type: application/json { "name": "kafka_link", "description":"test kafka link", "link-type":"kafka", "enabled":"true", "link-config-values": { "inputs": [{" name ":" the bootstrap. The servers ", "value" : "172.16.9.113:21007"}, {" name ":" sasl. Kerberos. Service. The name ", "value" : "kafka" }, { "name": "security.protocol","value": "SASL_PLAINTEXT"}// Security mode is SASL_PLAINTEXT, PLAINTEXT]}} ### kafka link create post https://{{host}}/api/v1/cdl/link content-type: application/json { "name": "kafka_link", "description":"test kafka link", "link-type":"kafka", "enabled":"true", "link-config-values": { "inputs": [{" name ":" the bootstrap. The servers ", "value" : "172.16.9.113:21007"}, {" name ":" sasl. Kerberos. Service. The name ", "value" : "kafka" }, { "name": "security.protocol","value": "SASL_PLAINTEXT"}// Security mode is SASL_PLAINTEXT, PLAINTEXT]} ### kafka link update put https://{{host}}/api/v1/cdl/link/kafka_link content-type: application/json { "name": "kafka_link", "description":"test kafka link", "link-type":"kafka", "enabled":"true", "link-config-values": { "inputs": [{" name ":" the bootstrap. The servers ", "value" : "172.16.9.113:21007"}, {" name ":" sasl. Kerberos. Service. The name ", "value" : "Kafka"}, {"name": "security.protocol","value": "SASL_PLAINTEXT"}Copy the code

CDL task command part of the REST request code

@hostname = 172.16.9.113 @port = 21495 @host = {{hostname}}:{{port}} @bootstrap = "172.16.9.113:21007" @bootstrap_normal @mysql_host = "172.16.2.118" @mysql_port = "3306" @mysql_database = "hudi" @mysql_user = "root" @mysql_password = "Huawei@123" ### create job post https://{{host}}/api/v1/cdl/job content-type: Application /json {"job_type": "CDL_JOB", // Job type. Currently, only CDL_JOB" name" is supported: "Mysql_to_kafka ", //job description :"mysql_to_kafka", //job description" from-link-name": "MySQL_link", // data source Link "to-link-name": "kafka_link", // target source Link "from-config-values": {" inputs": [{"name" : "connector.class", "value" : "com.huawei.cdc.connect.mysql.MysqlSourceConnector"}, {"name" : "schema", "value" : "hudi"}, {"name" : "db.name.alias", "value" : "hudi"}, {"name" : "whitelist", "value" : "hudisource"}, {"name" : "tables", "value" : "hudisource"}, {"name" : "tasks.max", "value" : "10"}, {"name" : "mode", "value" : "insert,update,delete"}, {"name" : "parse.dml.data", "value" : "true"}, {"name" : "schema.auto.creation", "value" : "false"}, {"name" : "errors.tolerance", "value" : "all"}, {"name" : "multiple.topic.partitions.enable", "value" : "false"}, {"name" : "topic.table.mapping", "value" : "[ {\"topicName\":\"huditableout\", \"tableName\":\"hudisource\"} ]" }, {"name" : "Producer. Override. Security protocol", "value" : "SASL_PLAINTEXT"}, / / security model for SASL_PLAINTEXT, normal mode for PLAINTEXT {" name ": "consumer.override.security.protocol", "value" : "SASL_PLAINTEXT"}, "to-config-values": {"inputs": []}, "job-config-values": { "inputs": [ {"name" : "global.topic", "value" : "demo"} ] } } ### get all job get https://{{host}}/api/v1/cdl/job ### submit job put https://{{host}}/api/v1/cdl/job/mysql_to_kafka/start ### get job status get https://{{host}}/api/v1/cdl/submissions?jobName=mysql_to_kafka ### stop job put https://{{host}}/api/v1/cdl/job/mysql_to_kafka/submissions/13/stop ### delete job DELETE https://{{host}}/api/v1/cdl/job/mysql_to_kafkaCopy the code

Validation scenarios

Production database MySQL original data is as follows:

After submitting the CDL task

Insert into hudi. Hudisource values (11, “中 心”,38, “女”, “中 心”,28732);

Corresponding kafka message body:

UPDATE hudi.hudisource SET uname= ‘AnneMarie333’ WHERE uid=11;

Corresponding kafka message body:

Delete from hudi.hudisource where uid=11;

Corresponding kafka message body:

Click to follow, the first time to learn about Huawei cloud fresh technology ~