preface

Canal’s main purpose is to provide incremental data subscription and consumption based on incremental log parsing of MySQL database. The working principle is as follows:

  • Canal emulated the interaction protocol of the MySQL slave, disguised itself as the MySQL slave, and sent the dump protocol to the MySQL master
  • MySQL master receives dump request and starts pushing binary log to slave
  • Canal parses binary log objects (originally byte streams)

It can be used in the following service scenarios:

  • Database mirroring
  • Real-time Database backup
  • Index building and real-time maintenance (split heterogeneous index, inverted index, etc.)
  • Service Cache Refresh
  • Incremental data processing with business logic

Current Canal supports source MySQL versions including 5.1.x, 5.5.x, 5.6.x, 5.7.x, 8.0.x.

Tips: The above content is from the official website. For details, please check the official Wiki documentation first.

Canal to use

Database Configuration

Before installing Canal, we need to modify the configuration of MySQL(version 5.7.34 is used in this article). Meanwhile, we can create a new test user and test library to complete the following steps. Firstly, modify the my.cnf configuration file and add the following configuration contents:

[mysqld] # enable log-bin=mysql-bin # select ROW mode log-format=ROW Do not duplicate slaveId for Canal server_id=1Copy the code

Then create a user named canal and a library named canal using root account and authorize canal:

-- Create a canal user and set the password to canal
CREATE USER canal IDENTIFIED BY 'canal';
Create a canal database and grant permissions to the canal user
CREATE DATABASE canal CHARACTER SET utf8mb4;
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@The '%';
GRANT ALL PRIVILEGES ON canal.* TO 'canal'@The '%';
FLUSH PRIVILEGES;
Copy the code

Then create a user table and add some data for subsequent tests:

CREATE TABLE `user` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'user id',
  `username` varchar(50) DEFAULT NULL COMMENT 'Username',
  `password` varchar(50) DEFAULT NULL COMMENT 'password',
  `email` varchar(45) DEFAULT NULL COMMENT 'email',
  `phone` varchar(15) DEFAULT NULL COMMENT 'Mobile number'.PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4;

INSERT INTO `canal`.`user` (`id`, `username`, `password`, `email`, `phone`) VALUES 
    (1.'Jiang Lei'.'k0VP$l@ru'.'[email protected]'.'18145206808'),
    (2.'Ding Yang'.'8pig73*dW'.'[email protected]'.'19832458514'),
    (3.'Shirley Yau'.'5G)c@7RyV'.'[email protected]'.'18656022523'),
    (4.'Kong Yang'.'KjvLG*BP'.'[email protected]'.'18674498531'),
    (5.'Dong Xia'.'%fqmhybp3'.'[email protected]'.'18192674843');
Copy the code

Ps: The above data is generated as mock. If there is any similarity, it is purely coincidental.

Install the Canal

download

Visit the Releases website to download and install version 1.1.5:

mkdir /data/canal cd /data/canal wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz tar ZXVF. - Canal. Deployer - 1.1.5. Tar. Gz rm - rf canal. Deployer - 1.1.5. Tar. GzCopy the code

Modify the configuration

Modify the conf/example/instance. The properties of the following configuration for your database connection information:

# can't and mysql server_id repeat. Canal instance. Mysql. SlaveId = 2 # position info Canal. The instance. The master. The address = 192.168.3.13:3306 # username/password canal. The instance. The dbUsername = canal canal.instance.dbPassword=canalCopy the code

Start the

sh bin/startup.sh
Copy the code

Firewall Settings

firewall-cmd --zone=public --add-port=11111/tcp --permanent
firewall-cmd --reload
Copy the code

test

Write the following test code to test the connection. The complete code has been uploaded to GitHub.

/** * Canal listener **@author zjw
 * @dateThe 2022-01-27 * /
@Slf4j
@Component
@RequiredArgsConstructor
public class CanalListener {

    private final CanalConfig canalConfig;

    @PostConstruct
    private void init(a) {
        listen();
    }

    /** * listen to Canal increment data */
    private void listen(a) {
        new Thread(() -> {
            // Connect to Canal
            CanalConnector connector = connect();
            try {
                // Loop to get database change information
                while (true) {
                    Message message = connector.getWithoutAck(canalConfig.getBatchSize());
                    long batchId = message.getId();
                    List<CanalEntry.Entry> entryList = message.getEntries();
                    if (batchId == -1 || entryList.isEmpty()) {
                        Util.sleep(500);
                    } else {
                        // If there is change information, print it
                        entryList.forEach(this::printEntry); } connector.ack(batchId); }}finally {
                connector.disconnect();
            }
        }).start();
    }

    /** * Connects to Canal **@returnConnection information */
    private CanalConnector connect(a) {
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(canalConfig.getHost(), canalConfig.getPort()),
                canalConfig.getTopic(),
                canalConfig.getUsername(),
                canalConfig.getPassword()
        );
        connector.connect();
        connector.subscribe();
        connector.rollback();
        return connector;
    }

    /** * Prints the changed entity information **@paramEntry Entity information */
    @SneakyThrows
    private void printEntry(CanalEntry.Entry entry) {
        CanalEntry.EntryType entryType = entry.getEntryType();
        if (entryType == CanalEntry.EntryType.TRANSACTIONBEGIN 
            	|| entryType == CanalEntry.EntryType.TRANSACTIONEND) {
            return;
        }
        String lineSeparator = System.lineSeparator();
        StringBuilder info = new StringBuilder(lineSeparator);
        info.append("========== Data change information ==========").append(lineSeparator);
        CanalEntry.Header header = entry.getHeader();
        info.append(String.format(
            "Database. Table name: %s.%s%n", header.getSchemaName(), header.getTableName()));
        info.append(String.format("Operation type: %s%n", header.getEventType()));
        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
            CanalEntry.EventType eventType = rowChange.getEventType();
            if (eventType == CanalEntry.EventType.DELETE) {
                info.append(String.format(
                    "delete: %s%n", getDataInfo(rowData.getBeforeColumnsList())));
            } else if (eventType == CanalEntry.EventType.INSERT) {
                info.append(String.format(
                    "insert: %s%n", getDataInfo(rowData.getAfterColumnsList())));
            } else {
                info.append(String.format(
                    "update: %s%n", getDataInfo(rowData.getAfterColumnsList())));
            }
        }
        log.info(info.toString());
    }

    /** * get row change information **@paramColumns Columns */
    private String getDataInfo(List<CanalEntry.Column> columns) {
        returnJSON.toJSONString( columns.stream() .collect(Collectors.toMap( CanalEntry.Column::getName, CanalEntry.Column::getValue)) ); }}Copy the code

Then change the password of user 5 from %fqmhybp3 to % fqmhyBP4, the console will print the following information:

We can see that we have successfully obtained the change information of the data, and through the change information, we can perform synchronous modification of the data, refresh the cache and other operations.

Dokcer installation Canal

Make sure Docker and Dokcer Compose are installed on your machine, then create docker-comemess.yml:

version: '3'
services:
  canal:
    container_name: canal_latest
    image: canal/canal-server
    restart: always
    ports:
      - 11111: 11111
    environment:
      - canal.instance.mysql.slaveId=2
      - Canal. The instance. The master. The address = 192.168.3.13:3306
      - canal.instance.dbUsername=canal
      - canal.instance.dbPassword=canal
    volumes:                                
      - ./conf:/admin/canal-server/conf
      - ./logs:/admin/canal-server/logs
Copy the code

Docker-compose up -d: docker-compose up -d: docker-compose up -d: docker-compose up -d: docker-compose up -d: docker-compose up -d: docker-compose up -d

RocketMQ configuration

Install RocketMQ

Docker-comemess.yml: Docker-comemess.yml: Docker-comemess.yml: Docker-comemess.yml: Docker-comemess.yml: Docker-comemess.yml

version: '3.5'
services:
  rmqnamesrv:
    image: foxiswho/rocketmq:server
    container_name: rmqnamesrv
    ports:
      - 9876: 9876
    networks:
      rmq:
        aliases:
          - rmqnamesrv
  rmqbroker:
    image: foxiswho/rocketmq:broker
    container_name: rmqbroker
    ports:
      - 10909: 10909
      - 10911: 10911
    volumes:
      - ./conf/broker.conf:/etc/rocketmq/broker.conf
    environment:
      NAMESRV_ADDR: "rmqnamesrv:9876"
      JAVA_OPTS: " -Duser.home=/opt"
      JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
    command: mqbroker -c /etc/rocketmq/broker.conf
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqbroker
  rmqconsole:
    image: styletang/rocketmq-console-ng
    container_name: rmqconsole
    ports:
      - 8076: 8080
    environment:
      JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqconsole
networks:
  rmq:
    name: rmq
    driver: bridge
Copy the code

Yml/docker-comemess. yml/docker-comemess. yml/docker-comemess. yml/docker-comemess. yml

BrokerClusterName =DefaultCluster brokerName= broker-A brokerId=0 # Change to host IP brokerIP1=192.168.3.13 defaultTopicQueueNums=4 autoCreateTopicEnable=true autoCreateSubscriptionGroup=true listenPort=10911 deleteWhen=04 fileReservedTime=120 mapedFileSizeCommitLog=1073741824 mapedFileSizeConsumeQueue=300000 diskMaxUsedSpaceRatio=88 maxMessageSize=65536 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSHCopy the code

Docker-compose up -d will start successfully.

Firewall Settings

firewall-cmd --zone=public --add-port=8076/tcp --permanent
firewall-cmd --zone=public --add-port=9876/tcp --permanent
firewall-cmd --reload
Copy the code

Local access 192.168.3.13:8076 Test:

Canal configuration RocketMQ

Modify the configuration

Modify the following configuration in conf/canal.properties:

# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rocketMQ

rocketmq.producer.group = canal_group
rocketmq.namesrv.addr = 192.168.3.13:9876
Copy the code

Modify the conf/example/instance. The properties of the following configuration:

# mq config
canal.mq.topic=canal_topic
Copy the code

Ps: The preceding configuration is based on the previous configuration.

Then run sh bin/restart.sh to apply the configuration.

test

Here, change the username 5 to Canal and view the message in the RocketMQ admin interface:

You can see that the change information has been successfully sent to RocketMQ.

Coding test

The following is a test of actual coding, complete code and configuration access to GitHub:

/** * user table Canal changes RocketMQ listener **@author zjw
 * @dateThe 2022-01-30 * /
@Slf4j
@Component
@RocketMQMessageListener( topic = "canal_topic", consumerGroup = "canal_group" )
public class UserCanalListener implements RocketMQListener<CanalMessage<User>> {

    @Override
    public void onMessage(CanalMessage<User> message) {
        String lineSeparator = System.lineSeparator();
        StringBuilder info = new StringBuilder(lineSeparator);
        info.append("========== Data change information ==========").append(lineSeparator);
        info.append(String.format(
            "Database. Table name: %s.%s%n", message.getDatabase(), message.getTable()));
        info.append(String.format("Operation type: %s%n", message.getType())); message.getData().forEach(user -> info.append(user).append(lineSeparator)); log.info(info.toString()); }}Copy the code

willidfor5Then view the console print message:

Configuring the Canal Console

download

Finally, to give you a brief introduction to Canal Admin, visit the Releases website to download and install version 1.1.5:

mkdir /data/canal-admin cd /data/canal-admin wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.admin-1.1.5.tar.gz tar ZXVF. - Canal. Admin - 1.1.5. Tar. Gz rm - rf canal. Admin - 1.1.5. Tar. GzCopy the code

Modify the configuration

Run the vi conf/application.yml command to modify the database configuration:

server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  address: 192.1683.13.: 3306
  database: canal_manager
  username: canal
  password: canal
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}? useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: admin
Copy the code

Then import conf/canal_manager. SQL into the database (import it as user root and authorize it to user canal) with the following authorization statement:

GRANT ALL PRIVILEGES ON canal_manager.* TO 'canal'@The '%';
FLUSH PRIVILEGES;
Copy the code

Start the

Run the following command to start the console:

sh bin/start.sh
Copy the code

Firewall Settings

firewall-cmd --zone=public --add-port=8089/tcp --permanent
firewall-cmd --reload
Copy the code

Modify the Canal Server configuration

Conf /canal.properties:

# canal admin config
canal.admin.manager = 192.168.3.13:8089
Copy the code

Then restart Canal:

sh bin/restart.sh
Copy the code

Then log in locally to 192.168.3.13:8089(remember to change it to your own IP). Note that the password here is not the same as the password in the Canal Admin configuration, where the password is 123456:

Then Canal Server can be managed:

conclusion

This article gives a brief introduction to Canal’s installation and use in conjunction with RocketMQ. In the future, Canal’s use will be described in some practical application cases (such as updating the cache when the data in the database table changes, so that there is no need to have many scattered cache updates in the code).