Hello, everyone. I’m Chen

Data synchronization has always been a headache. In the case of small business volume, few scenarios and small amount of data, we may choose to directly write some scheduled tasks in the project to process data manually. For example, data is found from multiple tables, then summarized and processed, and then inserted into the corresponding place.

However, with the increase of service volume, the amount of data and the realization of sub-database and sub-table in various complex scenarios, the data synchronization becomes more and more difficult.

Today’s article uses Canal, alibaba’s open source middleware, to address the pain point of incremental data synchronization.

The table of contents is as follows:

What is Canal?

Canal, which translates to a channel, is used to provide incremental data subscriptions and consumption based on incremental log parsing of the MySQL database.

What do we learn from this sentence?

Based on MySQL and with incremental parsing through MySQL logs, this means that the original business code is completely non-invasive.

How it works: Parses MySQL’s binlog to provide incremental data.

Logging based incremental subscription and consumption services include

  • Database mirroring
  • Real-time Database backup
  • Index building and real-time maintenance (split heterogeneous index, inverted index, etc.)
  • Service Cache Refresh
  • Incremental data processing with business logic

Current canal supports source MySQL versions including 5.1.x, 5.5.x, 5.6.x, 5.7.x, 8.0.x.

Official documentation: github.com/alibaba/can…

How is Canal data transmitted?

Here’s the official image:

Canal is divided into server and client, which ali often uses, such as the registry Nacos mentioned earlier:

  • Server: parses MySQL’s binlog and passes incremental data to the client or messaging middleware
  • Client: is responsible for parsing the data sent by the server and customizing its own business processing.

So far the support for messaging middleware is comprehensive, such as Kafka, RocketMQ, RabbitMQ.

Is there any other middleware for data synchronization?

There are, of course, some open source middleware that are also quite good, such as Bifrost.

The differences between common middleware types are as follows:

Of course, if I had to choose, I’d go with Canal, Alibaba’s middleware.

The Canal server is installed

The server needs to download the compressed package from github.com/alibaba/can…

The latest version is V1.1.5, click here to download:

After the download is decompressed, the directory is as follows:

This article uses Canal+RabbitMQ for data synchronization, so the following steps follow exactly that base.

1. Open the MySQL binlog file

MySQL > log file my.cnf = my.cnf

[mysqld]
log-bin=Mysql -bin # enable binlog
binlog-format=ROW # Select ROW mode
server_id=1 # configure MySQL replaction (slaveId
Copy the code

2. Set the MySQL configuration

The MySQL configuration in the server configuration file needs to be set so that Canal knows which library and which table log files need to be listened on.

A Server can be configured with multiple instances of listening. By default, the Canal function has an example example. If you want to add an instance, copy the contents of the Example folder to the same directory and specify the name of the added instance in canal.properties.

Modify the canal.deployer-1.1.5\conf\example\instance.properties configuration file

# url
canal.instance.master.address=127.0.0.1:3306
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root
Listen to the database
canal.instance.defaultDatabaseName=test
The table to be listened on can be specified, multiple are separated by commas, where the re is listened on all
canal.instance.filter.regex=. * \ \.. *

Copy the code

3. Set the RabbitMQ configuration

The default transmission mode on the server is TCP. You need to set MQ information in the configuration file.

Two configuration files need to be modified as follows:

1, canal. Deployer – 1.1.5 \ conf \ canal properties

This configuration file mainly sets up MQ related configurations such as URLS, usernames, passwords…

TCP, Kafka, rocketMQ, rabbitMQ
canal.serverMode = rabbitMQ
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
######### RabbitMQ #############
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
rabbitmq.host = 127.0.0.1
rabbitmq.virtual.host =/
# exchange
rabbitmq.exchange =canal.exchange
User name and password
rabbitmq.username =guest
rabbitmq.password =guest
Whether to persist
rabbitmq.deliveryMode = 2
Copy the code

2, canal. Deployer – 1.1.5 \ conf \ example \ instance properties

This file sets the routing KEY for MQ so that it can be routed to the specified queue, as follows:

canal.mq.topic=canal.routing.key
Copy the code

4. Create an exchange and Queue for RabbitMQ

Create a new canal.exchange (which must be the same as the configured exchange) and a queue named canal.queue (arbitrary name) in RabbitMQ.

The bound route KEY is canal.routing. KEY (must be the same as configured), as shown in the following figure:

5. Start the server

Click the script in the bin directory and double-click startup.bat in Windows. The startup is successful as follows:

6, test,

Insert a data entry into the local database test oauth_client_details as follows:

INSERT INTO `oauth_client_details` VALUES ('myjszl'.'res1'.'$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W'.'all'.'password,refresh_token,authorization_code,client_credentials,implicit'.'http://www.baidu.com'.NULL.1000.1000.NULL.'false');
Copy the code

Queue = canal.queue = canal.queue = canal.queue = canal.queue

This is a string of JSON data, which looks like this:

{
	"data": [{
		"client_id": "myjszl"."resource_ids": "res1"."client_secret": "$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W"."scope": "all"."authorized_grant_types": "password,refresh_token,authorization_code,client_credentials,implicit"."web_server_redirect_uri": "http://www.baidu.com"."authorities": null."access_token_validity": "1000"."refresh_token_validity": "1000"."additional_information": null."autoapprove": "false"}]."database": "test"."es": 1640337532000."id": 7."isDdl": false."mysqlType": {
		"client_id": "varchar(48)"."resource_ids": "varchar(256)"."client_secret": "varchar(256)"."scope": "varchar(256)"."authorized_grant_types": "varchar(256)"."web_server_redirect_uri": "varchar(256)"."authorities": "varchar(256)"."access_token_validity": "int(11)"."refresh_token_validity": "int(11)"."additional_information": "varchar(4096)"."autoapprove": "varchar(256)"
	},
	"old": null."pkNames": ["client_id"]."sql": ""."sqlType": {
		"client_id": 12."resource_ids": 12."client_secret": 12."scope": 12."authorized_grant_types": 12."web_server_redirect_uri": 12."authorities": 12."access_token_validity": 4."refresh_token_validity": 4."additional_information": 12."autoapprove": 12
	},
	"table": "oauth_client_details"."ts": 1640337532520."type": "INSERT"
}
Copy the code

The meaning of each field is already clear: table name, method, parameter, parameter type, parameter value…..

All the client has to do is listen to MQ for JSON data, parse it out, and handle its own business logic.

The Canal client is established

The client is simple to implement. All it needs to do is consume messages from the Canal server and listen to the canal.queue queue.

1. Create the message entity class

MQ passes JSON data, and of course creates an entity class to receive the data, as follows:

/ * * *@authorCanal message receiving entity class */
@NoArgsConstructor
@Data
public class CanalMessage<T> {
    @JsonProperty("type")
    private String type;

    @JsonProperty("table")
    private String table;

    @JsonProperty("data")
    private List<T> data;

    @JsonProperty("database")
    private String database;

    @JsonProperty("es")
    private Long es;

    @JsonProperty("id")
    private Integer id;

    @JsonProperty("isDdl")
    private Boolean isDdl;

    @JsonProperty("old")
    private List<T> old;

    @JsonProperty("pkNames")
    private List<String> pkNames;

    @JsonProperty("sql")
    private String sql;

    @JsonProperty("ts")
    private Long ts;
}
Copy the code

2. MQ message listening service

The next step is to listen to the queue, which can be consumed in time once there is a push of data from the Canal server.

The code is very simple, just give a receiving case, the specific business logic can be implemented according to the business, as follows:

import cn.hutool.json.JSONUtil;
import cn.myjszl.middle.ware.canal.mq.rabbit.model.CanalMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/** * listen on MQ to get data messages from Canal increments */
@Component
@Slf4j
@RequiredArgsConstructor
public class CanalRabbitMQListener {

    @RabbitListener(bindings = { @QueueBinding( value = @Queue(value = "canal.queue", durable = "true"), exchange = @Exchange(value = "canal.exchange"), key = "canal.routing.key" ) })
    public void handleDataChange(String message) {
        // Convert message to CanalMessage
        CanalMessage canalMessage = JSONUtil.toBean(message, CanalMessage.class);
        String tableName = canalMessage.getTable();
        log.info(The Canal listener {} has changed; Details: {}", tableName, message);
        //TODO business logic perfected by itself...............}}Copy the code

3, test,

SQL > insert data into table;

INSERT INTO `oauth_client_details`
VALUES
	( 'myjszl'.'res1'.'$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W'.'all'.'password,refresh_token,authorization_code,client_credentials,implicit'.'http://www.baidu.com'.NULL.1000.1000.NULL.'false' );
Copy the code

The message after client conversion is shown below:

The above figure shows that all the data has been received successfully. You just need to improve your business logic according to the data.

Client case source code has been uploaded to GitHub, concern public number: code ape technology column, reply keywords: 9530 get!

conclusion

Canal is not the only open source tool for incremental data synchronization, choosing the right components for its business needs.