Canal is an open source project of Alibaba. The main purpose of Canal is to provide incremental data subscription and consumption based on binlog log parsing of MySQL database.

Log-based incremental subscription and consumption services include:

  • Database mirroring
  • Real-time Database backup
  • Index building and real-time maintenance (split heterogeneous index, inverted index, etc.)
  • Service Cache Refresh
  • Incremental data processing with business logic

My side is mainly used in two scenarios:

One is to synchronize change data to Elasticsearch and Redis in real time.

Here is my current practice. On the one hand, it is full data timing synchronization. Due to the large amount of data and long synchronization time, the data is not real-time enough. The second aspect is the change of single data. Some of Elasticsearch and Redis logic are written directly in the business code, so the coupling is very serious.

When stripped out, real-time incremental updates can be implemented and decoupled, and the benefits are great.

The second is to preserve historical changes that focus on the data.

This is currently used in the “Asset Management” module, which implements IP lifecycle management by recording the creation, change and deletion of IP assets, facilitating historical information traceability.

MySQL configuration

MySQL my. CNF: enable binlog writing and set the mode to ROW.

log-bin=mysql-Bin # Enable binlog Binlog-format=ROW# optionROWModel server_id=1MySQL replaction (); MySQL replaction ()Copy the code

Restart the database to check whether the configuration takes effect.

mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+
1 row in set (0.19 sec)
mysql>
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+
1 row in set (0.00 sec)
mysql>
mysql> show master status;
+------------------+----------+--------------+------------------+-------------------+
| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+-------------------+
| mysql-bin000003. |     4230 |              |                  |                   |
+------------------+----------+--------------+------------------+-------------------+
1 row in set (0.00 sec)
Copy the code

Then create the user and authorize it.

mysql> CREATE USER canal IDENTIFIED BY 'canal';
mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'% %';
mysql> FLUSH PRIVILEGES;
mysql> show grants for 'canal'@'% %';
+----------------------------------------------------------------------------+
| Grants for canal@%%                                                        |
+----------------------------------------------------------------------------+
| GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO `canal`@`%%` |
+----------------------------------------------------------------------------+
1 row in set (0.00 sec)
Copy the code

Canal server

Pull mirror:

#Docker pull canal/canal - server: v1.1.4
Copy the code

Then use the official shell script to launch directly:

# sh run.sh -e canal.auto.scan=false -e canal.destinations=test- e canal. The instance. The master. The address = 127.0.0.1:3306 - e canal. The instance. The dbUsername = canal. The canal - e instance. DbPassword = canal - e  canal.instance.connectionCharset=UTF-8 -e canal.instance.tsdb.enable=true -e canal.instance.gtidon=false
Copy the code

Docker-compose: docker-compose: docker-compose: docker-compose

version: '3'

services:
  canal-server:
    image: Canal/canal - server: v1.1.4
    container_name: canal-server
    restart: unless-stopped
    network_mode: host
    ports: 
      - 11111: 11111
    environment:
      - canal.auto.scan=false
      - Canal. The instance. The master. The address = 127.0.0.1:3306
      - canal.instance.dbUsername=canal
      - canal.instance.dbPassword=canal
      - canal.instance.filter.regex=.*\\.. *
      - canal.destinations=test
      - canal.instance.connectionCharset=UTF-8
      - canal.instance.tsdb.enable=true
    volumes:
      - /root/canal/test/log/:/home/admin/canal-server/logs/
Copy the code

Start the service:

# docker-compose upRecreating canal-server ... done Attaching to canal-server canal-server | DOCKER_DEPLOY_TYPE=VM canal-server | ==> INIT /alidata/init/02init-sshd.sh  canal-server | ==> EXIT CODE: 0 canal-server | ==> INIT /alidata/init/fix-hosts.py canal-server | ==> EXIT CODE: 0 canal-server | ==> INIT DEFAULT canal-server | Generating SSH1 RSA host key: [ OK ] canal-server | Starting sshd: [ OK ] canal-server | Starting crond: [ OK ] canal-server | ==> INIT DONE canal-server | ==> RUN /home/admin/app.sh canal-server | ==> START ... canal-server | start canal ... canal-server | start canal successful canal-server | ==> START SUCCESSFUL ...Copy the code

The Canal Python client

Copy client code directly provided by the official:

import time

from canal.client import Client
from canal.protocol import EntryProtocol_pb2
from canal.protocol import CanalProtocol_pb2

client = Client()
client.connect(host='127.0.0.1', port=11111)
client.check_valid(username=b'', password=b'')
client.subscribe(client_id=b'1001', destination=b'test'.filter=b'.*\\.. * ')

while True:
    message = client.get(100)
    entries = message['entries']
    for entry in entries:
        entry_type = entry.entryType
        if entry_type in [EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN, EntryProtocol_pb2.EntryType.TRANSACTIONEND]:
            continue
        row_change = EntryProtocol_pb2.RowChange()
        row_change.MergeFromString(entry.storeValue)
        event_type = row_change.eventType
        header = entry.header
        database = header.schemaName
        table = header.tableName
        event_type = header.eventType
        for row in row_change.rowDatas:
            format_data = dict(a)if event_type == EntryProtocol_pb2.EventType.DELETE:
                for column in row.beforeColumns:
                    format_data = {
                        column.name: column.value
                    }
            elif event_type == EntryProtocol_pb2.EventType.INSERT:
                for column in row.afterColumns:
                    format_data = {
                        column.name: column.value
                    }
            else:
                format_data['before'] = format_data['after'] = dict(a)for column in row.beforeColumns:
                    format_data['before'][column.name] = column.value
                for column in row.afterColumns:
                    format_data['after'][column.name] = column.value
            data = dict(
                db=database,
                table=table,
                event_type=event_type,
                data=format_data,
            )
            print(data)
    time.sleep(1)

client.disconnect()
Copy the code

Functional verification

MySQL > alter table create table create table create table create table create table create table

mysql> create database test;
mysql> use test;
mysql> CREATE TABLE `role` (   `id` int unsigned NOT NULL AUTO_INCREMENT,   `role_name` varchar(255)
DEFAULT NULL.PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
mysql> insert into role (id, role_name) values (10.'admin');
Query OK, 1 row affected (0.01 sec)

mysql> update role set role_name='hh' where id = 10;
Query OK, 1 row affected (0.01 sec)
Rows matched: 1  Changed: 1  Warnings: 0

mysql> delete from role where id = 10;
Query OK, 1 row affected (0.01 sec)
Copy the code

Client printout:

$ python canal_client.py
connected to 127.0. 01.:11111
Auth succed
Subscribe succed


header {
  version: 1
  logfileName: "mysql-bin.000003"
  logfileOffset: 5497
  serverId: 1
  serverenCode: "UTF-8"
  executeTime: 1607843285000
  sourceType: MYSQL
  eventLength: 75
}
entryType: TRANSACTIONBEGIN
storeValue: " \217\001"

header {
  version: 1
  logfileName: "mysql-bin.000003"
  logfileOffset: 5630
  serverId: 1
  serverenCode: "UTF-8"
  executeTime: 1607843285000
  sourceType: MYSQL
  schemaName: "test"
  tableName: "role"
  eventLength: 47
  eventType: INSERT
  props {
    key: "rowsCount"
    value: "1"
  }
}
entryType: ROWDATA
storeValue: "\010\322\001\020\001P\000bN\022 \010\000\020\004\032\002id \001(\0010\000B\00210R\014int unsigned\022*\010\001\020\014\032\trole_name \000(\0010\000B\005adminR\014varchar(255)"

{'db': 'test'.'table': 'role'.'event_type': 1.'data': {'role_name': 'admin'}}
header {
  version: 1
  logfileName: "mysql-bin.000003"
  logfileOffset: 5677
  serverId: 1
  serverenCode: "UTF-8"
  executeTime: 1607843285000
  sourceType: MYSQL
  eventLength: 31
}
entryType: TRANSACTIONEND
storeValue: "\ 022\003440"
Copy the code

To change a piece of data, the output consists of TRANSACTIONBEGIN, ROWDATA, and TRANSACTIONEND. Then the content we are interested in is in ROWDATA, which is what we need after parsing, including database name, table name and change content.

The event_type field 1 indicates new, 2 indicates update, and 3 indicates deletion.

Update corresponding output:

{'db': 'test'.'table': 'role'.'event_type': 2.'data': {'before': {'id': '10'.'role_name': 'hh'}, 'after': {'id': '10'.'role_name': 'hh'}}}
Copy the code

Delete corresponding output:

{'db': 'test'.'table': 'role'.'event_type': 3.'data': {'role_name': 'hh'}}
Copy the code

After the canal server is started, two log files, meta. Log and test.log, are generated in the /home/admin/canal-server/logs/test directory to check whether the service is running properly and whether errors are reported. Where test is the name of the Canal.destinations setting when the Docker is started.

# cat meta.log
2020-12-13 14:55:18.051 - clientId:1001 cursor:[mysql-bin000003..4805.1607842360000.1,] address[/127.0. 01.:3306]
2020-12-13 14:55:33.051 - clientId:1001 cursor:[mysql-bin000003..5096.1607842531000.1,] address[127.0. 01.:3306]
2020-12-13 14:57:07.051 - clientId:1001 cursor:[mysql-bin000003..5387.1607842625000.1,] address[127.0. 01.:3306]

# cat test.log
2020-12-13 14:55:09.067 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2020-12-13 14:55:09.144 [destination = test , address = /127.0. 01.:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2020-12-13 14:55:09.144 [destination = test , address = /127.0. 01.:3306. EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status2020-12-13 14:55:09.693 [destination = test , address = /127.0. 01.:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin000003.,position=4699,serverId=1,gtid=,timestamp=1607842360000] cost : 538ms , the next step is binlog dump
Copy the code

Record on pit

Everything was fine in the test environment I built myself, but I still encountered a problem when I put it into the project beta environment:

[fetch failed by table meta:schemeName.tableName]

SQL > alter table alter table alter table alter table alter table alter table alter table alter table

canal.instance.filter.table.error=true
Copy the code

After adding, the error messages did disappear, but the consumed data did not have ROWDATA, which really bothered me for a long time.

To tell the truth, sometimes debugging procedures, and not afraid of error, fear is no error, and then the program is not normal.

Later, I removed the ignore table error configuration and looked at the log again. There was another error:

Caused by: java.io.IOException: ErrorPacket [errorNumber=1142, fieldCount=-1, message=SHOW command denied to user

The default binlog account does not have the select permission. The default binlog account does not have the select permission.

It’s important to take a moment and look at your journal.

Above, the next article will talk about docking MQ.

Reference Documents:

Github.com/alibaba/can…

Github.com/haozi315666…