Canal: Incremental subscription & consumption component of Alibaba mysql database Binlog

MySQL binlog

MySQL primary/secondary replication

The mysql server changes the configuration and restarts

$ vi /etc/my.cnf [mysqld] log-bin=mysql-bin binlog-format=ROW server_id=1 $ mysql -uroot CREATE USER canal IDENTIFIED BY  'canal'; GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES; $ sudo service mysqld startCopy the code

Question: What is the purpose of creating the Canal user? Can you just use an existing user name, such as root?


Answer: Some users do not have the privileges of REPLICATION SLAVE or REPLICATION CLIENT. When connecting to Canal using these users, the binlog cannot be obtained.


The Canal user here has full permissions, so the client can get the binlog from Canal.

Clear two concepts: Canal Server connects to mysql and client connects to Canal Server.

  • Canal refers to canal Server, which reads mysql’s binlog, parses it, and stores it
  • The client refers to the binlog consuming Canal Server

Connect to the server locally and verify that the binlog format is ROW

$mysql-h192.168.6.52 -ucanal -pcanal mysql> show variables like '%binlog_format%'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | binlog_format | ROW | +---------------+-------+Copy the code

Mysql master-slave replication

  • The master logs changes to the binary log;
  • The slave copies the master binary log events to its relay log.
  • Slave rewrites events in the trunk log to alter data that reflects itself.

binlog

Before starting Canal, let’s know what mysql’s binlog is:

mysql> show binlog events;
| Log_name         | Pos   | Event_type  | Server_id | End_log_pos | Info                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
+------------------+-------+-------------+-----------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| mysql-bin.000001 |     4 | Format_desc |         1 |         106 | Server ver: 5.1.73-log, Binlog ver: 4                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
| mysql-bin.000001 |   106 | Query       |         1 |        1864 | use `mysql`; CREATE TABLE IF NOT EXISTS db (   Host char(60) binary DEFAULT '' NOT NULL, Db char(64) binary DEFAULT '' NOT NULL, User char(16) binary DEFAULT '' NOT NULL, Select_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Insert_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Update_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Delete_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Create_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Drop_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Grant_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, References_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Index_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Alter_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Create_tmp_table_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Lock_tables_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Create_view_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Show_view_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Create_routine_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Alter_routine_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Execute_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Event_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Trigger_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, PRIMARY KEY Host (Host,Db,User), KEY User (User) ) engine=MyISAM CHARACTER SET utf8 COLLATE utf8_bin comment='Database privileges' |
| mysql-bin.000001 |  1864 | Query       |         1 |        3518 | use `mysql`; CREATE TABLE IF NOT EXISTS host (  Host char(60) binary DEFAULT '' NOT NULL, Db char(64) binary DEFAULT '' NOT NULL, Select_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Insert_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Update_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Delete_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Create_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Drop_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Grant_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, References_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Index_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Alter_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Create_tmp_table_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Lock_tables_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Create_view_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Show_view_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Create_routine_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Alter_routine_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Execute_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Trigger_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, PRIMARY KEY Host (Host,Db) ) engine=MyISAM CHARACTER SET utf8 COLLATE utf8_bin comment='Host privileges;  Merged with database privileges' |
Copy the code

Mysql-bin. XXX binlog file and index file will be generated in mysql data file

[qihuang.zheng@dp0652 canal]$ll /var/lib/mysql. 26228 DRWX ------ 2 mysql mysql 4096 10月 11 14:05 canal_test -rw-rw---- 1 mysql mysql 10485760 9月 30 22:12 ibDATA1 -RW-rw ---- 1 mysql mysql 5242880 10月 11 09:57 ib_logfile0 -rw-rw---- 1 mysql mysql 5242880 10月 11 09:57 ib_logfile1 DRWX ------ 2 mysql mysql 4096 8月 2 11:01 mysql -rw-rw---- 1 Mysql mysql 18451 8月 2 11:01 mysql-bin.000001 -rw-rw---- 1 mysql mysql 929226 8月 2 11:01 mysql-bin.000002 -rw-rw---- 1 Mysql mysql 4890698 9月 30 22:12 mysql-bin.000003 -rw-rw---- 1 mysql mysql 897 10月 11 14:06 mysql-bin.000004 -rw-rw---- 1 Mysql 76 10月 11 09:57 mysql.sock SRWXRWXRWX 1 mysql mysql 0 10月 11 09:57 mysql.sockCopy the code

Operations against mysql will have binary events recorded in a binlog file. The following operations include creating a user, authorizing, creating a database, creating a table, and inserting a record.

[qihuang.zheng@dp0652 canal]$sudo strings /var/lib/mysql-bin.000004 5.1.73-log CREATE USER canal IDENTIFIED BY 'canal' root localhost GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' FLUSH PRIVILEGES canal_test create database canal_test Canal_test create table test (uid int (4) primary key not null auto_increment, Name vARCHAR (10) not NULL) == Create a table canal_test BEGIN == Insert a record, there is a transaction. Canal_test test Canal_test COMMITCopy the code

Canal QuickStart

canal & config

Deploy the Canal Server to 6.52 and start it. View Canal’s log:

[qihuang.zheng@dp0652 canal]$cat logs/canal/canal.log 2017-10-11 11:31:52.076 [main] INFO Com. Alibaba. Otter. Canal. Deployer. CanalLauncher - # # start the canal for server 11:31:52. 2017-10-11, 151 [main] INFO Com. Alibaba. Otter. Canal. Deployer. CanalController - # # start the canal server [192.168.6.52:11111] the 2017-10-11 11:31:52. 644  [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......Copy the code

View instance logs:

[qihuang.zheng@dp0652 canal]$cat logs/example/example.log 2017-10-11 11:31:52.435 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal. The properties] 2017-10-11 11:31:52. 444. [the main] INFO C.A.O.C.I.S pring. Support. Accomplished - Loading The properties file from the class path resource/example/instance. The properties 11:31:52. 2017-10-11, 587 [main] INFO C.A.O tter. Canal. Instance. Spring. CanalInstanceWithSpring - start CannalInstance for 1 - example 11:31:52 2017-10-11. 599 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful.... 2017-10-11 11:31:52.679 [destination = example, address = /127.0.0.1:3306, EventParser] WARN c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - prepare to find start position just show master  statusCopy the code

There are several configuration files under the Canal Server conf

➜ canal.deployer-1.0.24 tree conf conf ├── canal.properties ├─ example │ ├── logback.xml ├── Spring ├ ─ ─ the default - the instance. The XML ├ ─ ─ file - the instance. The XML ├ ─ ─ group - the instance. The XML ├ ─ ─ the local - instance. XML └ ─ ─ memory-instance.xmlCopy the code

Let’s start with the first four configuration items of the canal.properties common property:

canal.id= 1
canal.ip=
canal.port= 11111
canal.zkServers=
Copy the code

Ccanal. Id is the ID of the canal. In a cluster environment, the ID of the canal is different. The IP address is not specified here. The default IP address is the local host, for example, 192.168.6.52 and port 11111. Zk is used for canal Cluster.

Look again at the destinations related configuration under Canal.properties:

################################################# ######### destinations ############# ################################################# canal.destinations = example canal.conf.dir = .. /conf canal.auto.scan = true canal.auto.scan.interval = 5 canal.instance.global.mode = spring canal.instance.global.lazy  = false canal.instance.global.spring.xml = classpath:spring/file-instance.xmlCopy the code

The canal.destinations = example here can be set to more than one, such as example1,example2, then two folders need to be created with an instance.properties file under each folder.

Global Canal instance management uses Spring, where file-instance. XML will eventually instantiate all destinations Instances:

<bean class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init="false"> <property  name="ignoreResourceNotFound" value="true" /> <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><! - allows the system to cover - > < property name = "locationNames" > < list > < value > classpath: canal. The properties value > < / <value>classpath:${canal.instance.destination:}/instance.properties</value> </list> </property> </bean> <bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring"> <property name="destination" value="${canal.instance.destination}" /> <property name="eventParser"><ref local="eventParser" /></property> <property name="eventSink"><ref local="eventSink" /></property> <property name="eventStore"><ref local="eventStore" /></property> <property name="metaManager"><ref local="metaManager" /></property> <property name="alarmHandler"><ref local="alarmHandler" /></property> </bean>Copy the code

Such as canal. The instance. The destination is equal to the example, it will load the example/instance properties configuration file

Properties configuration file in example does not need to be modified. A Single Canal Server can run multiple Canal instances.

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # mysql serverId, SlaveId here not like myql cluster in existing server_id canal. The instance. The mysql. SlaveId = 1234 # position connection info here is the address of the mysql master. Canal. The instance. The master. The address = 127.0.0.1:3306 canal. The instance. The master. The journal. The name = canal. The instance. The master. The position = canal.instance.master.timestamp = #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = # username/password canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.defaultDatabaseName = canal.instance.connectionCharset = UTF-8 canal.instance.filter.regex = .*\\.. * canal.instance.filter.black.regex = #################################################Copy the code

simple client

Create a database on mysql, create a table, insert a record, and modify the record.

create database canal_test;
use canal_test;
create table test (   uid int (4) primary key not null auto_increment,   name varchar(10) not null);
insert into test (name) values('10');
Copy the code

Modify the connection information of the client test example. Example corresponds to the name of the Canal instance.

String destination = "example"; CanalConnector connector = CanalConnectors. NewSingleConnector (new InetSocketAddress (" 192.168.6.52 ", 11111), destination, "canal", "canal");Copy the code

Note: If there is a connection error, the client test example will end immediately by printing ## stop the Canal client. Normally, the terminal doesn’t quit, it keeps running.

The SimpleCanalClientTest console results in the following:

****************************************************
* Batch Id: [1] ,count : [2] , memsize : [263] , Time : 2017-10-11 14:06:06
* Start : [mysql-bin.000004:396:1507701897000(2017-10-11 14:04:57)] 
* End : [mysql-bin.000004:491:1507701904000(2017-10-11 14:05:04)] 
****************************************************

----------------> binlog[mysql-bin.000004:396] , name[canal_test,] , eventType : QUERY , executeTime : 1507701897000 , delay : 69710ms
 sql ----> create database canal_test

----------------> binlog[mysql-bin.000004:491] , name[canal_test,test] , eventType : CREATE , executeTime : 1507701904000 , delay : 62723ms
 sql ----> create table test (   uid int (4) primary key not null auto_increment,   name varchar(10) not null)
Copy the code

Insert a record :(where uid and name update are both true)

****************************************************
* Batch Id: [2] ,count : [3] , memsize : [186] , Time : 2017-10-11 14:06:32
* Start : [mysql-bin.000004:659:1507701989000(2017-10-11 14:06:29)] 
* End : [mysql-bin.000004:822:1507701989000(2017-10-11 14:06:29)] 
****************************************************

================> binlog[mysql-bin.000004:659] , executeTime : 1507701989000 , delay : 3142ms
 BEGIN ----> Thread id: 11
----------------> binlog[mysql-bin.000004:785] , name[canal_test,test] , eventType : INSERT , executeTime : 1507701989000 , delay : 3154ms
uid : 1    type=int(4)    update=true
name : 10    type=varchar(10)    update=true
----------------
 END ----> transaction id: 0
================> binlog[mysql-bin.000004:822] , executeTime : 1507701989000 , delay : 3179ms
Copy the code

Update record :(where name update = true)

****************************************************
* Batch Id: [3] ,count : [3] , memsize : [202] , Time : 2017-10-11 14:49:11
* Start : [mysql-bin.000004:897:1507704547000(2017-10-11 14:49:07)] 
* End : [mysql-bin.000004:1076:1507704547000(2017-10-11 14:49:07)] 
****************************************************

================> binlog[mysql-bin.000004:897] , executeTime : 1507704547000 , delay : 4048ms
 BEGIN ----> Thread id: 13
----------------> binlog[mysql-bin.000004:1023] , name[canal_test,test] , eventType : UPDATE , executeTime : 1507704547000 , delay : 4059ms
uid : 1    type=int(4)
name : zqhxuyuan    type=varchar(10)    update=true
----------------
 END ----> transaction id: 0
================> binlog[mysql-bin.000004:1076] , executeTime : 1507704547000 , delay : 4096ms
Copy the code

In addition to example.log, there is also a meta.log file in the example instance of the canal installation package

[qihuang.zheng@dp0652 canal]$cat logs/example/meta. Log 2017-10-11 14:06:03.728-clientid :1001 Cursor: [mysql - bin. 000004396150701970, 00] address [/ 127.0.0.1:3306] the 14:06:04 2017-10-11. 589 - clientId: 1001 Cursor: [mysql - bin. 000004491150701040, 00] address [localhost / 127.0.0.1:3306] the 14:06:29 2017-10-11. 589 - clientId: 1001 Cursor: [mysql - bin. 000004822150701890, 00] address [localhost / 127.0.0.1:3306] the 14:49:08 2017-10-11. 589 - clientId: 1001 Cursor: [mysql - bin. 000004107, 6150704470, 00] address [localhost / 127.0.0.1:3306]Copy the code

Cannal Internal Overview

canal client & server

The communication between the Canal client and the Canal Server is in C/S mode. The client uses NIO and the server uses Netty. After the Canal Server is started, if there is no Canal Client, the Canal Server will not pull the binlog from mysql.

try { connector.connect(); connector.subscribe(); while (running) { Message message = connector.getWithoutAck(batchSize); Long batchId = message.getid (); int size = message.getEntries().size(); printSummary(message, batchId, size); printEntry(message.getEntries()); connector.ack(batchId); Rollback (batchId); // Submit confirmation of connector.rollback(batchId); // Processing failed, rollback data}} finally {connector.disconnect(); }Copy the code

The relationship between canal client and canal server is an incremental subscription/consumption, and the flow chart is as follows :(where C terminal is canal client and S terminal is canal server)

The Canal Client calls connect() with packetType.handshake and CLIENTAUTHENTICATION. The subscribe() method is then called, of type SUBSCRIPTION.

The server uses netty to process RPC requests (CanalServerWithNetty) :

bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipelines = Channels.pipeline(); pipelines.addLast(FixedHeaderFrameDecoder.class.getName(), new FixedHeaderFrameDecoder()); / / processing client HANDSHAKE request pipelines. AddLast (HandshakeInitializationHandler. Class. GetName (), new HandshakeInitializationHandler(childGroups)); / / request processing client CLIENTAUTHENTICATION pipelines. AddLast (ClientAuthenticationHandler. Class. GetName (), new ClientAuthenticationHandler(embeddedServer)); SessionHandler = new SessionHandler(embeddedServer); // handle client session requests, including SUBSCRIPTION, GET, etc. pipelines.addLast(SessionHandler.class.getName(), sessionHandler); return pipelines; }});Copy the code

After ClientAuthenticationHandler processing authentication, can remove HandshakeInitializationHandler and ClientAuthenticationHandler

In this example, the client sends GET and the server returns MESSAGES to the client after receiving binlog from mysql. The RPC interaction between the client and server is described as follows:

SimpleCanalConnector sends a GET request and reads the result of the response:

public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException { waitClientRunning(); int size = (batchSize <= 0) ? 1000 : batchSize; long time = (timeout == null || timeout < 0) ? -1 : timeout; If (unit == null) unit = timeunit.milliseconds; WriteWithHeader (packet.newBuilder ().setType(packettype.get).setBody(get.newBuilder ()) .setAutoAck(false) .setDestination(clientIdentity.getDestination()) .setClientId(String.valueOf(clientIdentity.getClientId())) .setFetchSize(size) .setTimeout(time) .setUnit(unit.ordinal()) .build() .toByteString()) .build() .toByteArray()); Return receiveMessages(); } private Message receiveMessages() throws IOException {// Reads data packets sent by the server. Packet P = Packet.parseFrom(readNextPacket()); switch (p.getType()) { case MESSAGES: { Messages messages = Messages.parseFrom(p.getBody()); Message result = new Message(messages.getBatchId()); for (ByteString byteString : messages.getMessagesList()) { result.addEntry(Entry.parseFrom(byteString)); } return result; }}}Copy the code

The server SessionHandler processes the GET request sent by the client:

Case the GET: / / read the client sends the packet, encapsulated as the object of the GET GET GET = CanalPacket. GET the parseFrom (packet. GetBody ()); // destination indicates canal instance if (stringutils.isnotempty (get.getDestination()) && StringUtils.isNotEmpty(get.getClientId())) { clientIdentity = new ClientIdentity(get.getDestination(), Short.valueOf(get.getClientId())); Message message = null; If (get getTimeout () = = 1) {/ / is the initial value message. = embeddedServer getWithoutAck (clientIdentity, get getFetchSize ()); } else { TimeUnit unit = convertTimeUnit(get.getUnit()); message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize(), get.getTimeout(), unit); } / / set is returned to the client Packet types for MESSAGES Packet. The Builder packetBuilder = CanalPacket. Packet. NewBuilder (); packetBuilder.setType(PacketType.MESSAGES); / / construct Message Messages. Builder messageBuilder = CanalPacket. Messages. NewBuilder (); messageBuilder.setBatchId(message.getId()); if (message.getId() ! = 1 &&! CollectionUtils.isEmpty(message.getEntries())) { for (Entry entry : message.getEntries()) { messageBuilder.addMessages(entry.toByteString()); } } packetBuilder.setBody(messageBuilder.build().toByteString()); Write (ctx.getChannel(), packetBuilder.build().tobyteArray (), null); }Copy the code

Introduction to the GET, ACK, and ROLLBACK protocols:

  • Message getWithoutAck(int batchSize), which allows batchSize to be specified. Multiple objects can be fetched at a time. Each object returned is Message. – Batch ID Unique identifier – Entries Specific data object format: entryProtocol. proto
  • Void rollback(Long batchId) rollback the last GET request to retrieve data. Submit the data based on the batchId obtained by GET to avoid misoperations
  • Void ack(long batchId), confirm that the consumption has been successful, notify the server to delete data. Submit the data based on the batchId obtained by GET to avoid misoperations

Protod corresponds to the following canal message structure:

Entry Header logfileName [binlog file name] logfileOffset [binlog position] executeTime [Timestamp of changes in binlog, accurate to seconds] schemaName TableName eventType [INSERT /update/delete type] entryType [BEGIN/ END/ Data ROWDATA] storeValue [byte Data, which can be expand, The corresponding type is RowChange. RowChange isDdl [Whether it is a DDL change operation, For example, create TABLE/DROP TABLE SQL [specific DDL SQL] rowDatas [Specific INSERT /update/ DELETE change data, can be multiple, one binlog event can correspond to multiple changes, For example, batch] beforeColumns [Array of Column type, afterColumns [array of Column type, Column index sqlType [JDBC type] name [Column name] isKey [primary key] updated [whether it has been changed] isNull [whether the value isNull] value [Specific content, note string text]Copy the code

CanalServerWithEmbedded methods are called in the SessionHandler for any other type of client request:

case SUBSCRIPTION: Sub sub = Sub.parseFrom(packet.getBody()); embeddedServer.subscribe(clientIdentity); case GET: Get get = CanalPacket.Get.parseFrom(packet.getBody()); message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize()); case CLIENTACK: ClientAck ack = CanalPacket.ClientAck.parseFrom(packet.getBody()); embeddedServer.ack(clientIdentity, ack.getBatchId()); case CLIENTROLLBACK: ClientRollback rollback = CanalPacket.ClientRollback.parseFrom(packet.getBody()); embeddedServer.rollback(clientIdentity); // Roll back all batchesCopy the code

CanalServerWithEmbedded

CanalServer contains multiple instances, and its member variable canalInstances records the mapping between Instance names and instances. Because it is a Map, the same instance name cannot be displayed on the same Server. For example, two Examples cannot be displayed on the same Server.

public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements CanalServer, CanalService {
    private Map<String, CanalInstance> canalInstances;
    private CanalInstanceGenerator     canalInstanceGenerator;
}
Copy the code

The following figure shows that a server has two instances, and each Client connects to one instance. Each Canal Instance emulates a slave of MySQL, so the slaveId of each Instance must be different. For example, the ids of the two instances in the figure are 1234 and 1235 respectively.

Note that there is an Instance for each Canal Client. When each Canal Client starts, it specifies a Destination, which is the name of the Instance. Therefore, the parameters of CanalServerWithEmbedded for processing various requests have ClientIdentity. If destination is obtained from ClientIdentity, the corresponding CanalInstance can be obtained

The following uses the CanalServerWithEmbedded subscription method as an example:

Public void subscribe(ClientIdentity ClientIdentity) throws CanalServerException {// ClientIdentity indicates the Canal Client Client, Since CanalServerWithEmbedded records the Instance for each Destination, Access to the client the corresponding Instance CanalInstance CanalInstance = canalInstances. Get (clientIdentity. GetDestination ()); if (! canalInstance.getMetaManager().isStart()) { canalInstance.getMetaManager().start(); / / start the Instance metadata manager.} canalInstance getMetaManager (). The subscribe (clientIdentity); / / perform a subscription Position meta Position. = canalInstance getMetaManager (). The getCursor (clientIdentity); if (position == null) { position = canalInstance.getEventStore().getFirstPosition(); If (position! = null) { canalInstance.getMetaManager().updateCursor(clientIdentity, position); / / update subscription relations under the cursor}} / / notify canalInstance change subscribeChange (clientIdentity); }Copy the code

Each CanalInstance contains four components: EventParser, EventSink, EventStore, and MetaManager.

The server’s main processing methods include get, ack, and rollback. These three methods all use several internal components of Instance, mainly EventStore and MetaManager:

EventStore is a RingBuffer with three Pointers: Put, Get, and Ack.

  • Put: After the Canal Server pulls data from MySQL and puts it into memory, Put increases
  • Get: The consumer (Canal Client) consumes data from memory, and Get increases
  • Ack: The Ack increases when the consumer finishes spending. The Ack data in Put will be deleted

The relationship of these three operations to the Instance component is as follows:

There are several ways for the client to obtain the mysql binlog via Canal Server (get method and getWithoutAck) :

  • If timeout is null, tryGet is used
  • If timeout is not null
    1. If timeout is 0, get blocks data and does not set timeout until there is enough batchSize data
    2. If timeout is not 0, get+timeout is used to obtain data. If there is not enough data in batchSize due to timeout, return as much data as possible
private Events<Event> getEvents(CanalEventStore eventStore, Position start, int batchSize, Long timeout, TimeUnit unit) { if (timeout == null) { return eventStore.tryGet(start, batchSize); } else if (timeout <= 0){return eventStore.get(start, batchSize); } else {return eventStore.get(start, batchSize, timeout, unit); }}Copy the code

Note: the EventStore implementation uses a RingBuffer RingBuffer similar to Disruptor. RingBuffer implementation class is MemoryEventStoreWithBuffer

The difference between the get method and the getWithoutAck method is:

  • The GET method immediately calls ack
  • The getWithoutAck method does not call ACK

EventStore

For example, start with current=-1, start with next=0, end=9, loop through all elements [0,9]. The List elements are (A,B,C,D,E,F,G,H,I,J).

next entries[next] next-current-1 list element
0 entries[0] 0 – (1) – 1 = 0 A
1 entries[1] 1 – (1) – 1 = 1 B
2 entries[2] 2 – (1) – 1 = 2 C
3 entries[3] 3 – (1) – 1 = 3 D
. …… . …… . .
9 entries[9] 9 – (1) – 1 = 9 J

After the first 10 elements are put, putSequence is set to End =9. Suppose the second batch puts five more elements :(K,L,M,N,O)

Current =9, start next=9+1=10, end=9+5=14, putSequence = end=14 after Put is complete.

next entries[next] next-current-1 list element
10 entries[10] 10 (9) – 1 = 0 K
11 entries[11] 11 – (9) – 1 = 1 L
12 entries[12] 12 – (9) – 1 = 2 M
13 entries[13] 13 – (9) – 1 = 3 N
14 entries[14] 14 – (9) – 1 = 3 O

Assuming the maximum size of the ring buffer is 15 (16MB in the source code), the above two batches produce a total of 15 elements, just enough to fill the ring buffer. If another Put event comes in, since the ring buffer is full and there are no available slots, the Put operation will be blocked until consumed.

Here is the code for putting to fill the ring buffer, checking the available slot (checkFreeSlotAt method) among several Put methods.

public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge implements CanalEventStore<Event>, CanalStoreScavenge { private static final long INIT_SQEUENCE = -1; private int bufferSize = 16 * 1024; private int bufferMemUnit = 1024; // MemSize unit, default is 1KB private int indexMask; private Event[] entries; Private AtomicLong putSequence = new AtomicLong(INIT_SQEUENCE); Private AtomicLong getSequence = new AtomicLong(INIT_SQEUENCE); Private AtomicLong ackSequence = new AtomicLong(INIT_SQEUENCE); // When the EventStore is started, a buffer of the specified size is created. The size of the Event array is 16*1024. For memory, the size is 16MB. Public void start() throws CanalStoreException {super.start(); indexMask = bufferSize - 1; entries = new Event[bufferSize]; Private void doPut(List<Event> data) {long current = putsequence.get (); Long end = current + data.size(); long end = current + data.size(); End =-1+10=9 end=-1+10=9 end=-1+10=9 end=-1+10=9 Ringbuffer for (long Next = current + 1; next <= end; next++) { entries[getIndex(next)] = data.get((int) (next - current - 1)); } putSequence.set(end); }}Copy the code

Put is production data, Get is consumption data, Get must not exceed Put. For example, if 10 items of data are Put, a maximum of 10 items of data can be obtained by Get. However, sometimes Put and Get are not equal to ensure the speed of Get processing. You can think of Put as the producer and Get as the consumer. Producers can be fast and consumers can spend slowly. Let’s say we Put 1000 items, Get we only need to process 10 items at a time.

Current =-1; maxAbleSequence=14; BatchSize = 10; Start next=current=-1, end=-1. With startPosition, next=0 is set. Finally, end is assigned 9, which is the loop buffer [0,9] with 10 elements.

private Events<Event> doGet(Position start, int batchSize) throws CanalStoreException { LogPosition startPosition = (LogPosition) start; long current = getSequence.get(); long maxAbleSequence = putSequence.get(); long next = current; long end = current; / / if the startPosition is null, that is the first time, the default + 1 to deal with the if (startPosition = = null | |! Startposition.getpostion ().isincluded ()) {// Next = next + 1; } end = (next + batchSize - 1) < maxAbleSequence ? (next + batchSize - 1) : maxAbleSequence; // Extract data and return for (; next <= end; next++) { Event event = entries[getIndex(next)]; If (ddlIsolation && isDdl(event.getentry ().getheader ().geteventType ()))) {// For DDL isolation, If (entrys.size() == 0) {entrys.add(event); // If there are no DML events, add the current DDL event end = next; } else {// If there was a DML event before, return it directly. End = next-1; } break; } else { entrys.add(event); PositionRange}} / / processing, and then set the getSequence to end getSequence.com pareAndSet (current, end)}Copy the code

The upper limit of an ACK operation is Get. Suppose 15 pieces of data are Put, 10 pieces of data are Get, and at most 10 pieces of data can be Ack10. The purpose of Ack is to empty the buffer of the data that has been got

public void ack(Position position) throws CanalStoreException { cleanUntil(position); } public void cleanUntil(Position position) throws CanalStoreException { long sequence = ackSequence.get(); long maxSequence = getSequence.get(); boolean hasMatch = false; long memsize = 0; for (long next = sequence + 1; next <= maxSequence; next++) { Event event = entries[getIndex(next)]; memsize += calculateSize(event); boolean match = CanalEventUtils.checkPosition(event, (LogPosition) position); If (match) {// Find the corresponding position, update ack seq hasMatch = true; if (batchMode.isMemSize()) { ackMemSize.addAndGet(memsize); For (long index = sequence + 1; index < next; index++) { entries[getIndex(index)] = null; / / set to null}} ackSequence.com pareAndSet (sequence, next)}}}Copy the code

The rollback method is simpler to implement, rolling back the getSequence to the ACK position.

public void rollback() throws CanalStoreException {
    getSequence.set(ackSequence.get());
    getMemSize.set(ackMemSize.get());
}
Copy the code

The following figure shows several examples of RingBuffer operations:

EventParser WorkFlow

EventStore is responsible for storing parsed Binlog events, while parsing action is responsible for pulling Binlog. Its process is complicated. You need to interact with the MetaManager. For example, you want to record each Position pulled so that the next pull can continue from the last Position. So MetaManager should be stateful.

The flow of EventParser is as follows:

  1. Connection Gets the location where the last parse succeeded (if it was started for the first time, the originally specified location or the binlog site of the current database)
  2. Connection Establishes a link and sends the BINLOG_DUMP command
  3. Mysql starts pushing Binaly logs
  4. The received Binaly logs are parsed by the Binlog Parser protocol to supplement some specific information
  5. Passing to the EventSink module for data storage is a blocking operation until the storage succeeds
  6. After the storage is successful, the Binaly Log location is periodically recorded

The Connection mentioned above refers to MysqlConnection that implements the ErosaConnection interface. The EventParser implementation class is MysqlEventParser, which implements AbstractEventParser.

EventParser parses the binlog and writes it to EventStore via EventSink. This link can be connected by EventStore’s PUT method:

There is also an EventTransactionBuffer, which is parsed and put into the buffer first. When a transaction occurs or the data exceeds a threshold, it is flushed: the data from the buffer is consumed and put into the EventStore. This buffer has two offset Pointers: putSequence and flushSequence.

Canal HA

The single machine simulated two Canal Servers, copied the single machine mode into two folders, and modified the relevant configurations

canal_m/conf/canal.properties

canal.id= 2
canal.ip=
canal.port= 11112
canal.zkServers=localhost:2181
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
Copy the code

canal_m/conf/example/instance.properties

canal.instance.mysql.slaveId = 1235
Copy the code

canal_s

canal.id= 3
canal.ip=
canal.port= 11113
canal.zkServers=localhost:2181
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
Copy the code

canal_s/conf/example/instance.properties

canal.instance.mysql.slaveId = 1236
Copy the code

Start the canal_m

The 2017-10-12 14:51:45. [the main] INFO 202 com. Alibaba. Otter. Canal. Deployer. CanalLauncher - # # start the canal server. The 2017-10-12 14:51:45. [the main] INFO 776 com. Alibaba. Otter. Canal. Deployer. CanalController - # # start the canal Server [192.168.6.52:11112] the 2017-10-12 14:51:46. 687. [the main] INFO com alibaba. Otter. Canal. Deployer. CanalLauncher - # # the canal server is running now ......Copy the code

Start the canal_s

The 2017-10-12 14:52:18. [the main] INFO 999 com. Alibaba. Otter. Canal. Deployer. CanalLauncher - # # start the canal server. The 2017-10-12 14:52:19. [the main] INFO 208 com. Alibaba. Otter. Canal. Deployer. CanalController - # # start the canal Server [192.168.6.52:11113] the 2017-10-12 14:52:19. 364. [the main] INFO com alibaba. Otter. Canal. Deployer. CanalLauncher - # # the canal server is running now ......Copy the code

Master, canal_m/logs/example/example, under the log log, and canal_s no example/logs folder

[qihuang. Zheng @ dp0652 ~] $tail -f canal_m/logs/example/example. The log 14:51:46. 2017-10-12, 453 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal. The properties] 2017-10-12 14:51:46. 463. [the main] INFO C.A.O.C.I.S pring. Support. Accomplished - Loading The properties file from the class path resource/example/instance. The properties 14:51:46. 2017-10-12, 624 [main] INFO C.A.O tter. Canal. Instance. Spring. CanalInstanceWithSpring - start CannalInstance for 1 - example 14:51:46 2017-10-12. 644 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful.... 2017-10-12 14:51:46.658 [destination = example, address = /127.0.0.1:3306, EventParser] WARN c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - prepare to find start position just show master  statusCopy the code

View Canal HA’s recorded messages in ZK

[zk: 192.168.6.52:2181 (CONNECTED) 7] ls/otter/canal/destinations/example/cluster [192.168.6.52:11112. 192.168.6.52:11113] [zk: 192.168.6.52:2181 (CONNECTED) 10] get/otter/canal/destinations/example/running {" active ": true," address ":" 192.168.6.52:11112 ", "cid" : 2}Copy the code

Start ClusterCanalClientTest for Example

CanalConnector connector = CanalConnectors. NewClusterConnector (192.168.6.52:2181 ", "destination," canal ", "canal");Copy the code

SQL: update test set name = ‘ZQH’ WHERE uid=1; The console logs are as follows:

****************************************************
* Batch Id: [1] ,count : [3] , memsize : [203] , Time : 2017-10-12 15:05:20
* Start : [mysql-bin.000004:1151:1507791918000(2017-10-12 15:05:18)] 
* End : [mysql-bin.000004:1331:1507791918000(2017-10-12 15:05:18)] 
****************************************************

================> binlog[mysql-bin.000004:1151] , executeTime : 1507791918000 , delay : 2080ms
 BEGIN ----> Thread id: 763
----------------> binlog[mysql-bin.000004:1277] , name[canal_test,test] , eventType : UPDATE , executeTime : 1507791918000 , delay : 2092ms
uid : 1    type=int(4)
name : zqh    type=varchar(10)    update=true
----------------
 END ----> transaction id: 0
================> binlog[mysql-bin.000004:1331] , executeTime : 1507791918000 , delay : 2130ms
Copy the code

Check the client information recorded in ZK again:

  • One Instance corresponds to one Client. The Instance name is Example and the corresponding Client number is 1001
  • To verify that Instance is indeed connected by the specified Client, look at port 11112 on the Server
[zk: 192.168.6.52:2181 (CONNECTED) 18] get/otter/canal/destinations/example / 1001 / running {" active ": true," address ":" 10.57.241.44:53942 ", "clientId" : 1001} [zk: 192.168.6.52:2181 (CONNECTED) 19] get/otter/canal/destinations/example / 1001 / cursor {"@type":"com.alibaba.otter.canal.protocol.position.LogPosition", "identity":{"slaveId":-1,"sourceAddress":{"address":"localhost","port":3306}}, "postion":{"included":false,"journalName":"mysql-bin.000004","position":1331,"serverId":1,"timestamp":1507791918000}} = = "serverId said MySQL server_id [qihuang. Zheng @ dp0652 ~] $netstat - anpt | grep TCP 11112 0 0 0.0.0.0:11112 0.0.0.0: * LISTEN 27816/ Java == "Canal server TCP 0 19 192.168.6.52:11112 10.57.241.44:53942 ESTABLISHED 27816/ Java ==" Canal clientCopy the code

Stop canal_m

[qihuang.zheng@dp0652 canal_m]$ bin/stop.sh
dp0652: stopping canal 27816 ...
Oook! cost:1
Copy the code

Instance is started on the slave node, canal_s

[qihuang. Zheng @ dp0652 ~] $tail -f canal_s/logs/example/example. The log 15:17:21. 2017-10-12, 452 [New I/O server worker # 1-1) ERROR com.alibaba.otter.canal.server.netty.NettyUtils - ErrotCode:400 , Caused by : something goes wrong with channel:[id: 0 x0c182149, / 10.57.241.44:54008 = > / 192.168.6.52:11113], exception=com.alibaba.otter.canal.server.exception.CanalServerException: Destination :example should start first 2017-10-12 15:17:21.661 [pool-1-thread-1] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal. The properties] 2017-10-12 15:17:21. [663] - thread pool - 1-1 the INFO C.A.O.C.I.S pring. Support. Is accomplished - Loading the properties file from the class path resource/example/instance. The properties 15:17:21. 2017-10-12 767 [pool-1-thread-1] WARN org.springframework.beans.TypeConverterDelegate - PropertyEditor [com.sun.beans.editors.EnumEditor] found through deprecated global PropertyEditorManager fallback - consider using a more isolated form of registration, e.g. on the BeanWrapper/BeanFactory! 15:17:21. 2017-10-12 [968] - thread pool - 1-1 the INFO C.A.O tter. Canal. Instance. Spring. CanalInstanceWithSpring - start CannalInstance for 1-example 2017-10-12 15:17:21.998 [pool-1-thread-1] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful.... 2017-10-12 15:17:22.071 [destination = example, address = /127.0.0.1:3306, EventParser] WARN c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - prepare to find start position just last position {"identity":{"slaveId":-1,"sourceAddress":{"address":"localhost","port":3306}},"postion":{"included":false,"journalName" :"mysql-bin.000004","position":1331,"serverId":1,"timestamp":1507791918000}}Copy the code

After stopping canal_m, only canal_s is left, so the Canal cluster has only one node:

[zk: 192.168.6.52:2181(CONNECTED) 14] ls /otter/canal/cluster [192.168.6.52:11113] 192.168.6.52:2181 (CONNECTED) 5] get/otter/canal/destinations/example/running {" active ": true," address ":" 192.168.6.52:11113 ", "cid" : 3}Copy the code

Logs of the Client during switchover

The 2017-10-12 15:17:22. 524 / Thread - 2 WARN c.a. libaba. Otter. Canal. Client. Impl. ClusterCanalConnector - failed to connect To :/192.168.6.52:11113 After Retry 0 times 2017-10-12 15:17:22.529 [Thread-2] WARN C.A.O tter. Canal. Client. Impl. Running. ClientRunningMonitor - canal is not run any node in the 2017-10-12 15:17:27. 695 [Thread-2] INFO c.alibaba.otter.canal.client.impl.ClusterCanalConnector - restart the connector for next round retry. **************************************************** * Batch Id: [1] ,count : [1] , memsize : [75] , Time : 2017-10-12 15:17:27 * Start : [mysql-bin.000004:1331:1507791918000(2017-10-12 15:05:18)] * End : [mysql-bin.000004:1331:1507791918000(2017-10-12 15:05:18)] **************************************************** ---------------- END ----> transaction id: 0 ================> binlog[mysql-bin.000004:1331] , executeTime : 1507791918000 , delay : 729763msCopy the code

Execute the SQL statement again

****************************************************
* Batch Id: [2] ,count : [3] , memsize : [198] , Time : 2017-10-12 15:20:56
* Start : [mysql-bin.000004:1406:1507792855000(2017-10-12 15:20:55)] 
* End : [mysql-bin.000004:1581:1507792855000(2017-10-12 15:20:55)] 
****************************************************

================> binlog[mysql-bin.000004:1406] , executeTime : 1507792855000 , delay : 1539ms
 BEGIN ----> Thread id: 763
----------------> binlog[mysql-bin.000004:1532] , name[canal_test,test] , eventType : UPDATE , executeTime : 1507792855000 , delay : 1539ms
uid : 1    type=int(4)
name : zqhx    type=varchar(10)    update=true
----------------
 END ----> transaction id: 0
================> binlog[mysql-bin.000004:1581] , executeTime : 1507792855000 , delay : 1540ms
Copy the code

After the client is stopped, query the client information in the ZK. Note that there is still cursor information, but there is no running, because instance has no corresponding client.

[zk: 192.168.6.52:2181 (CONNECTED) 1] ls/otter/canal/destinations/example [running, cluster, 1001] [zk: 192.168.6.52:2181 (CONNECTED) 0] ls/otter/canal/destinations/example / 1001 [cursor] [zk: 192.168.6.52:2181 (CONNECTED) 6] get/otter/canal/destinations/example / 1001 / cursor {"@type":"com.alibaba.otter.canal.protocol.position.LogPosition", "identity":{"slaveId":-1,"sourceAddress":{"address":"localhost","port":3306}}, "postion":{"included":false,"journalName":"mysql-bin.000004","position":1581,"serverId":1,"timestamp":1507792855000}}Copy the code

Cursor information is the location where instance consumes the binlog and remains in ZK even when the client stops.

Note: 1001 is the fixed number for ClientIdentity. The source code is found in the SimpleCanalConnector constructor.

Here is a summary of the relevant records in ZK:

Otter/canal / | - cluster = = > [192.168.6.52:11112, 192.168.6.52:11113] | - destinations = = > instances | - example1 / = = > the instance name | | - cluster = = > [192.168.6.52:11112. 192.168.6.52:11113] | | - running = = > {" active ": true," address ":" 192.168.6.52:11112 ", "cid" : 2} | | 1001 | | - running = = > {" active ": true," address ":" 10.57.241.44:53942 ", "clientId" : 1001} | | - cursor = = > {localhost:3306,"journalName":"mysql-bin.000004","position":1331,"serverId":1} |- example2/ | |- cluster ==> [192.168.6.52:11112, 192.168.6.52:11113] | | - running = = > {" active ": true," address ":" 192.168.6.52:11112 ", "cid" : 2} | | 1001 | | - running = = > {" active ": true," address ":" 10.57.241.44:53942 ", "clientId" : 1001} | | - cursor = = > {localhost:3306,"journalName":"mysql-bin.000004","position":1331,"serverId":1}Copy the code

The following is a flowchart for the Canal Server HA:

  1. When canal Server wants to start a canal instance, it first makes an EPHEMERAL attempt to ZooKeeper.
  2. After the ZooKeeper node is successfully created, the corresponding Canal Server starts the corresponding Canal Instance. The canal instance that is not successfully created is in standby state
  3. If the node created by Canal Server A disappears, ZooKeeper immediately notifies the other Canal Servers to perform Step 1 again and select A Canal Server to start instance.
  4. Each time the Canal Client connects, it first asks ZooKeeper who started canal Instance and then establishes a link with it. If the link is unavailable, it tries to connect again.

Canal Client HA

The Canal Client approach is similar to the Canal Server approach in that it is controlled by preempting the EPHEMERAL node of ZooKeeper.

About Canal Client validation of HA, you can refer to: blog.csdn.net/xiaolinzi00…

  • Start multiple clients in IDEA at the same time and execute an SQL statement. One client will print logs and the other one will not.
  • Stop the client.
  • Execute the SQL statement again, and the other client prints the log

Client1’s log:

****************************************************
* Batch Id: [3] ,count : [3] , memsize : [198] , Time : 2017-10-12 17:59:59
* Start : [mysql-bin.000004:1656:1507802398000(2017-10-12 17:59:58)] 
* End : [mysql-bin.000004:1831:1507802398000(2017-10-12 17:59:58)] 
****************************************************

================> binlog[mysql-bin.000004:1656] , executeTime : 1507802398000 , delay : 1188ms
 BEGIN ----> Thread id: 768
----------------> binlog[mysql-bin.000004:1782] , name[canal_test,test] , eventType : UPDATE , executeTime : 1507802398000 , delay : 1199ms
uid : 1    type=int(4)
name : zqh    type=varchar(10)    update=true
----------------
 END ----> transaction id: 0
================> binlog[mysql-bin.000004:1831] , executeTime : 1507802398000 , delay : 1236ms
## stop the canal client## canal client is down.
Copy the code

After stopping Client1, Client2 logs:

****************************************************
* Batch Id: [4] ,count : [3] , memsize : [198] , Time : 2017-10-12 18:02:15
* Start : [mysql-bin.000004:1906:1507802534000(2017-10-12 18:02:14)] 
* End : [mysql-bin.000004:2081:1507802534000(2017-10-12 18:02:14)] 
****************************************************

================> binlog[mysql-bin.000004:1906] , executeTime : 1507802534000 , delay : 1807ms
 BEGIN ----> Thread id: 768
----------------> binlog[mysql-bin.000004:2032] , name[canal_test,test] , eventType : UPDATE , executeTime : 1507802534000 , delay : 1819ms
uid : 1    type=int(4)
name : zqhx    type=varchar(10)    update=true
----------------
 END ----> transaction id: 0
================> binlog[mysql-bin.000004:2081] , executeTime : 1507802534000 , delay : 1855ms
Copy the code

Observe the client node corresponding to instance in the ZK node. During client switchover, the client is changed. For example, the following client switches from port 56806 to port 56842. No running under 1001 after all clients are closed. Instance client no longer consumes binlog.

[zk: 192.168.6.52:2181 (CONNECTED) 29] get/otter/canal/destinations/example / 1001 / running {" active ": true," address ":" 10.57.241.44:56806 ", "clientId" : 1001} [zk: 192.168.6.52:2181 (CONNECTED) 30] get/otter/canal/destinations/example / 1001 / running Node does not exist: /otter/canal/destinations/example/1001/running [zk: 192.168.6.52:2181 (CONNECTED) 31] get/otter/canal/destinations/example / 1001 / running {" active ": true," address ":" 10.57.241.44:56842 ", "clientId" : 1001} [zk: 192.168.6.52:2181 (CONNECTED) 32] the ls/otter/canal/destinations/example / 1001 / cursorCopy the code

Specific implementation related classes are: ClientRunningMonitor/ClientRunningListener/ClientRunningData.

Client running controls the failover mechanism of the client. The Canal Client allows multiple Canal clients to be started at the same time. Through the running mechanism, only one canal client is working and the other clients are in cold standby. When a running client hangs, running controls the cold standby client into working mode to ensure that the Canal Client is not a single point. Ensure high availability of the entire system.

The client HA implementation is shown on the left and the server HA implementation is shown on the right

Develop Canal Client

First understand: github.com/alibaba/can…

subscribe change

Look again at the subscription method for CanalServerWithEmbedded. We know that the client calls the subscribe() method immediately after connecting to a destination on the server.

When a client connects to a server, you must specify a destination name because a server may have multiple destinations. For example, the server starts two instances with destination names example1 and example2. Suppose you have two clients A and B, with A connecting to example1 and B connecting to example2. The canalInstances dictionary on the server is {example1=>Instance1, example2->Instance2}. So ClientA’s destination is equal to example1, and the corresponding server instance is Instance1. ClientB’s destination is equal to example2, and the corresponding server instance is Instance3.

/** * Client subscription, Filter information is updated after repeated subscription */ public void subscribe(ClientIdentity ClientIdentity) throws CanalServerException {CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination()); if (! canalInstance.getMetaManager().isStart()) { canalInstance.getMetaManager().start(); } canalInstance.getMetaManager().subscribe(clientIdentity); // Get the last Cursor Position Position = from MetaManager canalInstance.getMetaManager().getCursor(clientIdentity); If (position = = null) {/ / if not position = canalInstance getEventStore () getFirstPosition (); If (position! = null) { canalInstance.getMetaManager().updateCursor(clientIdentity, position); Logger. Info ("subscribe successfully, {} with first position:{} ", clientIdentity, position); } else {// use logger. Info ("subscribe successfully, use last cursor position:{} ", clientIdentity, position); } / / subscription relations under the notification canalInstance change subscribeChange (clientIdentity); }Copy the code

There are two things about the subscribe method: CanalInstance itself calls subscribeChange, and its associated MetaManager calls subscribe.

A CanalServer can have multiple CanalInstances, and each Instance will have a MetaManager. And one Instance corresponds to one Client. So, a MetaManager will only have one Client. But from the data structure below, it looks like a MetaManager can have multiple destinations.

public class MemoryMetaManager extends AbstractCanalLifeCycle implements CanalMetaManager { protected Map<String, List<ClientIdentity>> destinations; protected Map<ClientIdentity, MemoryClientIdentityBatch> batches; protected Map<ClientIdentity, Position> cursors; public synchronized void subscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException { List<ClientIdentity> clientIdentitys = destinations.get(clientIdentity.getDestination()); if (clientIdentitys.contains(clientIdentity)) { clientIdentitys.remove(clientIdentity); } clientIdentitys.add(clientIdentity); }}Copy the code

Guess: Multiple clients can connect to the same Instance (although only one Instance is active), so a single MetaManager can manage multiple clients.


NO! The HA of the Client is different from that recorded by MetaManager. HA means that only one Client is active at a time, so MetaManager cannot log two clients at the same time.

On the official ClientAPI documentation: The ClientIdentity is the identity of the interaction between the Canal Client and the server. The clientId is currently written to 1001. The clientId design is reserved for one instance multi-client consumption mode.

It is possible for an Instance to have multiple clients connected to it.

See AbstractMetaManagerTest’s doSubscribeTest method to understand why the data structure is designed this way.

You can subscribe to different clients for the same destination. The following examples subscribe to [client1,client2] and [client1,client3], respectively.

public void doSubscribeTest(CanalMetaManager metaManager) { ClientIdentity client1 = new ClientIdentity(destination, (short) 1); metaManager.subscribe(client1); metaManager.subscribe(client1); Client1 ClientIdentity client2 = new ClientIdentity(destination, (short) 2); metaManager.subscribe(client2); List<ClientIdentity> clients = metaManager.listAllSubscribeInfo(destination); Assert.assertEquals(Arrays.asList(client1, client2), clients); metaManager.unsubscribe(client2); ClientIdentity client3 = new ClientIdentity(destination, (short) 3); metaManager.subscribe(client3); clients = metaManager.listAllSubscribeInfo(destination); Assert.assertEquals(Arrays.asList(client1, client3), clients); }Copy the code

The subscribe method of CanalServerWithEmbedded also ends by calling the subscribeChange method of AbstractCanalInstance. Filter and blacklist are set for the table name. The configuration item is in instance.properties.

# table regex canal.instance.filter.regex = .*\\.. * # table black regex canal.instance.filter.black.regex =Copy the code

Filter indicates which binlogs of MySQL tables the client wants to obtain through Canal Server. The preceding configuration item indicates that all tables are obtained.

public class AbstractCanalInstance extends AbstractCanalLifeCycle implements CanalInstance { protected Long canalId; // Interact with manager to uniquely identify protected String destination; // Queue name protected CanalEventStore<Event> eventStore; Protected CanalEventParser eventParser; Protected CanalEventSink<List<CanalEntry.Entry>> eventSink; Protected CanalMetaManager metaManager; // Consume information manager protected CanalAlarmHandler @override public Boolean subscribeChange(ClientIdentity identity) {if (StringUtils.isNotEmpty(identity.getFilter())) { logger.info("subscribe filter change to " + identity.getFilter()); AviaterRegexFilter aviaterFilter = new AviaterRegexFilter(identity.getFilter()); boolean isGroup = (eventParser instanceof GroupEventParser); List<CanalEventParser> eventParsers = ((GroupEventParser).geteventParsers (); for (CanalEventParser singleEventParser : EventParsers) {// Need to iterate start ((AbstractEventParser) singleEventParser).seteventFilter (aviaterFilter); } } else { ((AbstractEventParser) eventParser).setEventFilter(aviaterFilter); }} // Filter processing rules // a. parser processing data filtering processing // b. sink processing data routing and distribution, a parse data can be distributed to multiple copies after passing through sink. // Return true for one-to-many distribution of subsequent memory versions; }}Copy the code

Corresponding to EventParser, there are two Filter references. Such as the above eventParser. SetEventFilter () method will be set AbstractEventParser eventfilters.

public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle implements CanalEventParser<EVENT> {
    protected CanalLogPositionManager                logPositionManager         = null;
    protected CanalEventSink<List<CanalEntry.Entry>> eventSink                  = null;
    protected CanalEventFilter                       eventFilter                = null;
    protected CanalEventFilter                       eventBlackFilter           = null;
}
Copy the code

EventParser Implement

The AbstractEventParser start() method is the primary method for parsing binlogs. After transactionBuffer and BinLogParser are started, a background worker thread, parseThread, is started to run:

Note: The following steps are nested in a while loop, which ends with a sleep.

ErosaConnection = buildErosaConnection(); // start a heartbeat thread startHeartBeat(erosaConnection); // 3. Perform preDump(erosaConnection). // 4. Connect to MySQL database erosaconnection.connect (); EntryPosition startPosition = findStartPosition(erosaConnection); logger.info("find start position : {}", startPosition.toString()); / / links, because there may be state in the process of looking for the position, after the reconstruction erosaConnection need to disconnect. Reconnect (); // Define the callback function. The sink() method will be temporarily stored in the buffer transactionBuffer after successful parsing. EventSink final SinkFunction sinkHandler = new SinkFunction<EVENT>() {private LogPosition lastPosition; public void sink(EVENT event) { CanalEntry.Entry entry = parseAndProfilingIfNecessary(event); if (entry ! = null) { transactionBuffer.add(entry); this.lastPosition = buildLastPosition(entry); // Record the corresponding positions}}}; / / 6. Began to dump data if (StringUtils. IsEmpty (startPosition. GetJournalName () && startPosition. GetTimestamp ()! = null) { erosaConnection.dump(startPosition.getTimestamp(), sinkHandler); } else { erosaConnection.dump(startPosition.getJournalName(), startPosition.getPosition(), sinkHandler); }Copy the code

ErosaConnection here refers to the connection between Canal Server and MySQL. The connection between CanalClient and CanalConnector refers to the connection between CanalClient and CanalServer.

CanalServer connects to MySQL to obtain binlog dump packets. CanalClient has multiple requests to CanalServer (GET/ACK, etc.).

We won’t go into the dump process in detail, but take a look at how erosaConnection’s MySQL implementation, MysqlConnection, calls the callback function when it gets an event.

public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException { updateSettings(); sendBinlogDump(binlogfilename, binlogPosition); // Connector refers to the connection between CanalServer and MySQL Master server. Create a pull thread pull MySQL binlog DirectLogFetcher fetcher = new DirectLogFetcher (the getReceiveBufferSize ()); fetcher.start(connector.getChannel()); LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT); LogContext context = new LogContext(); While (fetcher.fetch()) {// Since the size of the buffer is set, only one batch of data is fetched in each dump LogEvent event = null; event = decoder.decode(fetcher, context); if (! func.sink(event)) break; // Call the callback method}}Copy the code

The server has a heartbeat thread whose purpose is to consume the transactionBuffer and write it to EventSink.

protected boolean consumeTheEventAndProfilingIfNecessary(List<CanalEntry.Entry> entrys) {
    boolean result = eventSink.sink(entrys, 
        (runningInfo == null) ? null : runningInfo.getAddress(), destination);
    return result;
}
Copy the code

EventSink eventually writes the data to the EventStore, which is Put into RingBuffer.

eunomia

[zk: 192.168.6.55:2181 (CONNECTED) 3] ls/otter/canal/destinations [octopus_demeter example_bak, namelist_test, xiaopang2, namelist2, xiaopang3, namelist1, example, xiaopang] [zk: 192.168.6.55:2181 (CONNECTED) 4] the ls/otter/canal/destinations/xiaopang [eunomia, cluster, 1001, running] [zk: 192.168.6.55:2181 (CONNECTED) 5] ls/otter/canal/destinations/xiaopang/eunomia [_c_2a900d4e-75fb-4445-b30c-04e1bdb2e5d9-lock-0001381746, runnning, _c_ea33DB37-9193-4C75-9E61-85e59e123109-lock-0001381738] // Eunomia Server? Canal Client? [zk: 192.168.6.55:2181 (CONNECTED) 7] get/otter/canal/destinations/xiaopang eunomia/runnning 10.57.17.100 [zk: 192.168.6.55:2181 (CONNECTED) 18] get/otter/canal/destinations/xiaopang / 1001 / running {" active ": true," address ":" 10.57.17.100:60661 ", "clientId" : 1001}Copy the code