SpringBoot Canal data synchronization solution

A, requirements,

In the case of multi-database microservice, canal can be used to replace the trigger. Canal is proposed in response to the business demand of Alibaba’s cross-room synchronization. Based on the log analysis of database, Canal can obtain changes for incremental subscription & consumption. Whether for canal experiment or incremental backup, master/slave replication or recovery, mysql-binlog should be enabled. Data directories should be set to different disk partitions to reduce I/O wait.

Working principle of CANAL

  1. Canal emulated the interaction protocol of the MySQL slave, disguised itself as the MySQL slave, and sent the dump protocol to the MySQL master
  2. MySQL master receives dump request and starts pushing binary log to slave
  3. Canal parses binary log objects (originally byte streams)

Second,The deployment environment

1. Log in to mysql to check whether binlog is enabled. By default, log_bin in red is OFF

mysql> show variables like 'log_%'; +----------------------------------------+-------------------------------------------------------+ | Variable_name | Value | +----------------------------------------+-------------------------------------------------------+ | **log_bin |  OFF** | | log_bin_basename | | | log_bin_index | | | log_bin_trust_function_creators | OFF | | log_bin_use_v1_row_events | OFF | | log_builtin_as_identified_by_password | OFF | | log_error | F: \ tools \ mysql - 5.7.28 - winx64 \ Data \ DESKTOP - C1LU9IQ. Err | | log_error_verbosity | 3 | | log_output | FILE | | log_queries_not_using_indexes | OFF | | log_slave_updates | OFF | | log_slow_admin_statements | OFF | | log_slow_slave_statements | OFF | | log_statements_unsafe_for_binlog | ON | | log_syslog | ON | | log_syslog_tag | | | log_throttle_queries_not_using_indexes | 0 | | log_timestamps | UTC | | log_warnings | 2 | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + 19 rows in the set (0.03) sec)Copy the code

2. Edit the configuration file

Basedir =F:\\tools\\mysql-5.7.28-winx64 \\mysql-5.7.28-winx64 I'm going to make a mistake with single slashes, but look at other people's tutorials, there are single slashes. Datadir =F:\\tools\\mysql-5.7.28-winx64\\Data # max_connections=200 # The number of connection failures allowed. Max_connect_errors =10 # The default character set used by the server is UTF8 character-set-server= UTF8 # The default storage engine to be used when creating new tables Default_authentication_plugin =mysql_native_password default_authentication_plugin=mysql_native_password lower_case_table_names=2 sql_mode = STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION Log-bin =mysql-bin =mysql-bin =mysql-bin =mysql-bin Server_id =1 [mysql] # default-character-set=utf8 [client] # Port =3306 default-character-set=utf8 Port =3306 default-character-set=utf8Copy the code

3. Create the privileges canal account for MySQL slave and authorize remote connections

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@The '%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@The '%' ;
FLUSH PRIVILEGES;
Copy the code

4, Remember to restart mysql service

Linux:
systemctl restart mysqld
Window:
net stop mysql;
net start mysql;
Copy the code

Three,Canal Rapid deployment configuration

1, modify configuration conf/example/instance properties

## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info, need to change your own database informationCanal. The instance. The master. The address = 127.0.0.1:3306 canal. The instance. The master. The journal. The name = canal. The instance. The master. The position = canal.instance.master.timestamp =#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 
#username/password
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regexcanal.instance.filter.regex = .\*\\\\.. A \ *Copy the code

2. Run the sh bin/startup.sh startup script

3. View server logs and instance logs

$ tail -fLogs/canal canal. Log 13:52:03 2020-05-28. 037 [main] INFO com. Alibaba. Otter. Canal. Deployer. CanalLauncher -## set default uncaught exception handlerThe 2020-05-28 13:52:03. [the main] INFO 065 com. Alibaba. Otter. Canal. Deployer. CanalLauncher -## load canal configurationsThe 2020-05-28 13:52:03. [the main] INFO 072 com. Alibaba. Otter. Canal. Deployer. CanalStarter -## start the canal server.The 2020-05-28 13:52:03. [the main] INFO 444 com. Alibaba. Otter. Canal. Deployer. CanalController -## start the canal server[172.36.58.25(172.36.58.25):11111]The 2020-05-28 13:52:04. [the main] INFO 604 com. Alibaba. Otter. Canal. Deployer. CanalStarter -## the canal server is running now ......
$ tail -fLogs/example/example. The log 13:52:04 2020-05-28. 238. [the main] WARN O.S.B eans. GenericTypeAwarePropertyDescriptor - Invalid JavaBean property'connectionCharset'being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)] The 2020-05-28 13:52:04. 264. [the main] INFO C.A.O.C.I.S pring. Support. Accomplished - Loading the properties file From class Path resource [canal.properties] 2020-05-28 13:52:04.265 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance. The properties] 13:52:04. 2020-05-28, 568 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstancefor1-2020-05-28 example 13:52:04. 572. [the main] WARN C.A.O.C anal. Parse. The inbound. Mysql. Dbsync. LogEventConvert - > init table filter : ^.*\.. * $2020-05-28 13:52:04. 573. [the main] WARN C.A.O.C anal. Parse. The inbound. Mysql. Dbsync. LogEventConvert - > init table, black Filter: the 2020-05-28 13:52:04. 577. [the main] INFO C.A.O tter. Canal. The instance. The core. AbstractCanalInstance - start successful... 2020-05-28 13:52:04.616 [destination = example, address = /127.0.0.1:3306, EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long timeforReset or first position 2020-05-28 13:52:04.616 [destination = example, address = /127.0.0.1:3306, EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show Master status 2020-05-28 13:52:06.556 [destination = example, address = /127.0.0.1:3306, EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=4,serverId=1,gtid=<null>,timestamp=1590644973000] cost : 1935ms , the next step is binlog dump
Copy the code

4. Preliminary monitoring experiment

	<dependency>
      <groupId>com.alibaba.otter</groupId>
      <artifactId>canal.client</artifactId>
      <version>1.1.0</version>
  </dependency>
Copy the code
import java.net.InetSocketAddress;
import java.util.List;

import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;

public class SimpleCanalClientExample {

    public static void main(String args[]) {
        // Create a link
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example".""."");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(". * \ \.. *");
            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // Get the specified amount of data
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // Submit confirmation
                // connector.rollback(batchId); // Data rollback failed
            }

            System.out.println("empty too many times, exit");
        } finally{ connector.disconnect(); }}private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after"); printColumn(rowData.getAfterColumnsList()); }}}}private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + ":" + column.getValue() + " update="+ column.getUpdated()); }}}Copy the code

Random insertion of data triggered

INSERT INTO `demo`.`tb_ad`(`id`.`url`.`status`.`position`.`image`.`start_time`.`end_time`) VALUES (1.'https://www.baidu.com/'.'1'.'web_index_lb'.'https://pics1.baidu.com/feed/c83d70cf3bc79f3d5c30d358deb67a17738b29a6.jpeg?https://kins.oss-cn-shenzhen.aliyuncs.com/yh zb/2020-03-11/ca21b3b17d6f4757b991dd86b8cef3fa-VIP-680.jpeg'.'the 2020-05-22 10:58:08'.'the 2021-06-01 10:58:14');
Copy the code

From the console

empty count : 66
empty count : 67
empty count : 68
empty count : 69
empty count : 70
================&gt; binlog[mysql-bin.000001:355] , name[demo,tb_ad] , eventType : INSERT
id : 2    update=true
url : https://www.baidu.com/    update=true
status : 1    update=true
position : web_index_lb    update=trueimage : https://pics1.baidu.com/feed/c83d70cf3bc79f3d5c30d358deb67a17738b29a6.jpeg?https://kins.oss-cn-shenzhen.aliyuncs.com/yhz b/2020- 03- 11/ca21b3b17d6f4757b991dd86b8cef3fa-VIP- 680.jpeg    update=true
start_time : 2020- 05- 22 10:58:08    update=true
end_time : 2021- 06- 01 10:58:14    update=true
Copy the code

5. Data monitoring microservices

<! - third party starter rapid integration of canal https://github.com/NormanGyllenhaal/canal-client-- >
<! -- https://mvnrepository.com/artifact/top.javatool/canal-spring-boot-starter -->
<dependency>
    <groupId>top.javatool</groupId>
    <artifactId>canal-spring-boot-starter</artifactId>
    <version>1.2.1 - RELEASE</version>
</dependency>
Copy the code

Add, delete, or change operations to a subscription database

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;

@Component
@CanalTable(value = "t_user")
public class UserHandler implements EntryHandler<User> {

    private Logger logger = LoggerFactory.getLogger(UserHandler.class);

    public void insert(User user) {
        logger.info("insert message {}", user);
    }

    public void update(User before, User after) {
        logger.info("update before {} ", before);
        logger.info("update after {}", after);
    }

    public void delete(User user) {
        logger.info("delete {}", user); }}Copy the code

Start the data monitoring microservice, modify the User table, and observe the console output.

2020-05-28 16:23:22.667  INFO 24284[l - the client - thread] T.J.C.C lient. Client. AbstractCanalClient: get news Message [id =23,entries=[header {
  version: 1
  logfileName: "mysql-bin.000001"
  logfileOffset: 18380
  serverId: 1
  serverenCode: "UTF-8"
  executeTime: 1590654201000
  sourceType: MYSQL
  schemaName: ""
  tableName: ""
  eventLength: 68
}
entryType: TRANSACTIONBEGIN
storeValue: " \025"
, header {
  version: 1
  logfileName: "mysql-bin.000001"
  logfileOffset: 18505
  serverId: 1
  serverenCode: "UTF-8"
  executeTime: 1590654201000
  sourceType: MYSQL
  schemaName: "demo"
  tableName: "t_user"
  eventLength: 88
  eventType: UPDATE
  props {
    key: "rowsCount"
    value: "1"
  }
}
entryType: ROWDATA
storeValue: "\b\210\002\020\002P\000b\370\003\n\033\b\000\020\004\032\002id \001(\0000\000B\00221R\aint(11)\n*\b\001\020\f\032\tuser_name \000(\0000\000B\005ZeldaR\fvarchar(255)\n*\b\002\020\372\377\377\377\377\377\377\377\377\001\032\006gender \000(\0000\000B\0010R\ntinyint(4)\n\"\b\003\020\004\032\ncountry_id \000(\0000\000B\0011R\aint(11)\n&\b\004\020[\032\bbirthday \000(\0000\000B\n1998-04-18R\004date\n7\b\005\020]\032\vcreate_time \000(\0000\000B\0231991-01-10 05:45:50R\ttimestamp\022\033\b\000\020\004\032\002id \001(\0000\000B\00221R\aint(11)\022.\b\001\020\f\032\tuser_name \000(\0010\000B\tZelda1111R\fvarchar(255)\022*\b\002\020\372\377\377\377\377\377\377\377\377\001\032\006gender \000(\0000\000B\0010R\ntinyint(4)\022\"\b\003\020\004\032\ncountry_id \000(\0000\000B\0011R\aint(11)\022&\b\004\020[\032\bbirthday \000(\0000\000B\n1998-04-18R\004date\0227\b\005\020]\032\vcreate_time \000(\0000\000B\0231991-01-10 05:45:50R\ttimestamp"
, header {
  version: 1
  logfileName: "mysql-bin.000001"
  logfileOffset: 18593
  serverId: 1
  serverenCode: "UTF-8"
  executeTime: 1590654201000
  sourceType: MYSQL
  schemaName: ""
  tableName: ""
  eventLength: 31
}
entryType: TRANSACTIONEND
storeValue: "\ 022\0041574"
],raw=false,rawEntries=[]]
2020-05-28 16:23:22.668  INFO 24284 --- [xecute-thread-6] t.j.canal.example.handler.UserHandler    : update before User{id=null, userName='Zelda', gender=null, countryId=null, birthday=null, createTime=null} 
2020-05-28 16:23:22.668  INFO 24284 --- [xecute-thread-6] t.j.canal.example.handler.UserHandler    : update after User{id=21, userName='Zelda1111', gender=0, countryId=1, birthday=Sat Apr 18 00:00:00 CST 1998, createTime=Thu Jan 10 05:45:50 CST 1991}
Copy the code