Author: code fixed link: www.jianshu.com/p/91aa28f0c… Source: Jane Book

preface

The product search function in mall project has not done real-time data synchronization. Alibaba’s Canal can sync data from MySQL to Elasticsearch in real time. Today we are going to talk about canal’s use, hopefully to help you!

Introduction of canal

Canal is mainly used to parse incremental logs of the MySQL database and subscribe and consume incremental data. To put it simply, it can synchronize incremental data of MySQL in real time to data stores such as MySQL, Elasticsearch, and HBase.

Working principle of CANAL

Canal will simulate the interaction protocol between the MySQL primary and secondary libraries, so as to disguise itself as the MySQL secondary library, and then send the dump protocol to the MySQL primary library. After receiving the dump request, the MySQL primary library will push the binlog to Canal, and Canal will synchronize the data to other storage by parsing the binlog.

Canal to use

MySQL syncing data to Elasticsearch in real time Canal-server, canal-Adapter, canal-admin, canal-admin, canal-server, canal-Adapter, canal-admin

*canal-server (canal-deploy) : enables you to directly listen to the MySQL binlog and pretend to be the MySQL slave library. The canal-server is only responsible for receiving data without processing it. * Canal-Adapter: a canal-server client that obtains data from the canal-server and synchronizes data to MySQL, Elasticsearch, HBase, etc.

  • Canal-admin: provides o&M functions for canal, such as overall configuration management and node operation and maintenance (O&M), as well as a user-friendly WebUI to facilitate rapid and secure operations.

  • MySQL, Elasticsearch, and Canal have compatibility issues with different versions of MySQL, Elasticsearch, and Canal.

Since Canal implements data synchronization by subscribing to MySQL’s binlog, we need to enable MySQL’s binlog write function and set binlog-format to ROW mode. My configuration file is/mydata/mysql/conf/my CNF, changed to the following;

[mysqld] ## set server_id Server_id =101 ## binlog-ignore-db=mysql ## Enable binary log-bin=mall-mysql ## Binlog_cache_size =1M ## Set the binary log format to be used (mixed,statement,row) binlog_format=row ## Binary log expiration time. The default value is 0, indicating that automatic clearing is not performed. Expire_logs_days =7 ## Skips all errors or the specified type of errors encountered in the slave replication to avoid replication interruption. Slave_skip_errors =1062; slave_skip_errors=1062Copy the code
  • After the configuration is complete, restart the systemMySQLAfter the restart, run the following command to view the informationbinlogWhether to enable;
show variables like '%log_bin%'

Copy the code
+---------------------------------+-------------------------------------+
| Variable_name                   | Value                               |
+---------------------------------+-------------------------------------+
| log_bin                         | ON                                  |
| log_bin_basename                | /var/lib/mysql/mall-mysql-bin       |
| log_bin_index                   | /var/lib/mysql/mall-mysql-bin.index |
| log_bin_trust_function_creators | OFF                                 |
| log_bin_use_v1_row_events       | OFF                                 |
| sql_log_bin                     | ON                                  |
+---------------------------------+-------------------------------------+

Copy the code
  • Check it againThe MySQLthebinlogMode;
show variables like 'binlog_format%';  

Copy the code
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+

Copy the code
  • Next you need to create an account with secondary library privileges for subscribingbinlog, the created account iscanal:canal;
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

Copy the code
  • Create a database for testingcanal-test, and then create a list of itemsproduct, construct the statement as follows.
CREATE TABLE `product`  (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `title` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `sub_title` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `price` decimal(10, 2) NULL DEFAULT NULL,
  `pic` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

Copy the code

Upload the downloaded compressed package canal.deployer-1.1.5-snapshot.tar. gz to the Linux server and decompress it to the specified directory /mydata/canal-server.

The tar - ZXVF canal. Deployer - 1.1.5 - the SNAPSHOT. Tar. GzCopy the code
  • After decompression, the directory structure is as follows.
├ ─ ─ bin │ ├ ─ ─ restart. Sh │ ├ ─ ─ startup. The bat │ ├ ─ ─ startup. Sh │ └ ─ ─ stop. Sh ├ ─ ─ the conf │ ├ ─ ─ canal_local. Properties │ ├ ─ ─ Canal. The properties │ └ ─ ─ example │ └ ─ ─ the instance. The properties ├ ─ ─ lib ├ ─ ─ logs │ ├ ─ ─ canal │ │ └ ─ ─ canal. The log │ └ ─ ─ example │ ├── ├─ ├─ ├─ ├─ ├─ ├─ ├─ ├─ ├─Copy the code
  • Modifying a Configuration Fileconf/example/instance.properties, modify database configurations as follows.
# need to synchronize data MySQL address canal. The instance. The master. The address = 127.0.0.1:3306 canal. The instance. The master. The journal. The name = Canal. The instance. The master. The position = canal. The instance. The master. The timestamp = canal. The instance. The master. The gtid = # is used to synchronize data database account Canal. Instance. DbUsername = # canal is used to synchronize data database password canal. Instance. DbPassword = canal # database connection code Canal. Instance. ConnectionCharset = utf-8 # need to subscribe to the binlog filtering table regular expression canal. The instance. The filter. The regex =. * \ \.. *Copy the code
  • usestartup.shThe script to startcanal-serverService;
sh bin/startup.sh

Copy the code
  • After the service is successfully started, run the following command to view service logs.
tail -f logs/canal/canal.log

Copy the code
The 2020-10-26 16:18:13. [the main] INFO 354 com. Alibaba. Otter. Canal. Deployer. CanalController - # # start the canal Server [172.17.0.1 (172.17.0.1) : 11111] the 2020-10-26 16:18:19, 978 [main] INFO com. Alibaba. Otter. Canal. Deployer. CanalStarter -  ## the canal server is rCopy the code
  • After the startup is successful, run the following command to view the informationinstanceLog information;
tail -f logs/example/example.log 

Copy the code
The 2020-10-26 16:18:16. 056. [the main] INFO C.A.O.C.I.S pring. Support. Accomplished - Loading the properties file From class Path resource [canal.properties] 2020-10-26 16:18:16.061 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance. The properties] 16:18:18. 2020-10-26, 259 [main] INFO C.A.O tter. Canal. Instance. Spring. CanalInstanceWithSpring - start CannalInstance for 1 - example 16:18:18 2020-10-26. 282 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\.. * $2020-10-26 16:18:18. 282. [the main] WARN C.A.O.C anal. Parse. The inbound. Mysql. Dbsync. LogEventConvert - > init table, black filter : ^mysql\.slave_.*$2020-10-26 16:18:19.543 [destination = example, address = /127.0.0.1:3306, EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, It will be long time for reset or first position 2020-10-26 16:18:19.578 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful.... [destination = example, address = /127.0.0.1:3306, EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position {"identity":{"slaveId":-1,"sourceAddress":{"address":"localhost","port":3306}},"postion":{"gtid":"","included":false,"jo UrnalName ":" mall - mysql - bin. 000006 ", "position" : 2271, "serverId" : 101, "timestamp" : 1603682664000}} 16:18:22 2020-10-26. 435 [Destination = example, address = /127.0.0.1:3306, EventParser] WARN C.A.O.C.P.inbouCopy the code
  • If you want to stopcanal-serverThe service can use the following command.
sh bin/stop.sh

Copy the code

Canal-adapter upload the downloaded compressed package canal.adapter-1.1.5-snapshot.tar. gz to Linux server and decompress it to the specified directory /mydata/canal-adpter.

├ ─ ─ bin │ ├ ─ ─ adapter. The pid │ ├ ─ ─ restart. Sh │ ├ ─ ─ startup. The bat │ ├ ─ ─ startup. Sh │ └ ─ ─ stop. Sh ├ ─ ─ the conf │ ├ ─ ─ Application. Yml │ ├ ─ ─ es6 │ ├ ─ ─ es7 │ │ ├ ─ ─ biz_order. Yml │ │ ├ ─ ─ customer. Yml │ │ └ ─ ─ product. Yml │ ├ ─ ─ hbase │ ├ ─ ─ Kudu │ ├ ─ ─ logback. XML │ ├ ─ ─ meta-inf │ │ └ ─ ─ spring. Factories │ └ ─ ─ RDB ├ ─ ─ lib ├ ─ ─ logs │ └ ─ ─ adapter │ └ ─ ─ Adapter. The log └ ─ ─ the pluginCopy the code
  • Modifying a Configuration Fileconf/application.yml, you can modify the configuration as followscanal-serverConfiguration, data source configuration, and client adapter configuration;
Canal. conf: mode: TCP # Kafka rocketMQ flatMessage: ZookeeperHosts is valid only in Kafka /rocketMQ mode. SyncBatchSize: Retries: 0 number of retries (-1 is an infinite retry) timeout: timeout (in milliseconds) accessKey: secretKey: consumerProperties: # canal TCP consumer canal. TCP. Server host: 127.0.0.1:11111 # set the canal - server address canal. TCP. The zookeeper. Hosts: Canal.tcp.batch. size: 500 canal.tcp.username: canal.tcp.password: srcDataSources: # JDBC: mysql: / / 127.0.0.1:3306 / canal_test? UseUnicode =true username: canal password: canal canalAdapters: # OuterAdapters: - name: outerAdapters: - name: outerAdapters: - name: outerAdapters: - name: outerAdapters: Logger # log printing adaptor - name: ES7 # ES Synchronization adaptor hosts: 127.0.0.1:9200 # ES connection address properties: mode: Auth: test:123456 # only used for REST mode cluster.name: Elasticsearch # ES cluster nameCopy the code
  • Adding a Configuration Filecanal-adapter/conf/es7/product.yml, for configurationMySQLThe table andElasticsearchIndex mapping relationship;
Destination: example # canal instance or TOPIC groupId of MQ: G1 # corresponds to the groupId in MQ mode. EsMapping: _index: canal_product # es SQL > select * from es where id = 'pk'; "SELECT count (*) FROM product p where name = 'id' and name = 'title'; "Where a.c_time>={}" # etL commitBatch: 3000 # commitBatch: 3000Copy the code
  • usestartup.shThe script to startcanal-adapterService;
sh bin/startup.sh

Copy the code
  • After the service is successfully started, run the following command to view service logs.
tail -f logs/adapter/adapter.log

Copy the code
16:52:55 20-10-26. 148. [the main] INFO C.A.O.C anal. Adapter. The launcher. Loader. CanalAdapterLoader - Load canal adapter: Logger succeed 16:52:57 2020-10-26. 005. [the main] INFO C.A.O.C.C lient. Adapter. Es. Core. Config. ESSyncConfigLoader - # # Start loading es mapping config ... The 2020-10-26 16:52:57. 376. [the main] INFO C.A.O.C.C lient. Adapter. Es. Core. Config. ESSyncConfigLoader - # # es mapping config The loaded the 2020-10-26 16:52:58. 615. [the main] INFO C.A.O.C anal. Adapter. The launcher. Loader. CanalAdapterLoader - Load canal adapter: Es7 succeed 16:52:58 2020-10-26. 651. [the main] INFO c.a. libaba. Otter. Canal. The core. The spi. ExtensionLoader - the extension classpath dir: / mydata/canal - adapter/plugin 16:52:59. 2020-10-26, 043 [main] INFO C.A.O.C anal. Adapter. The launcher. Loader. CanalAdapterLoader - Start adapter for canal-client mq topic: Example - g1 succeed the 2020-10-26 16:52:59. [the main] 044 INFO C.A.O.C anal. Adapter. The launcher. Loader. CanalAdapterService - # # the canal client adapters are running now ...... The 2020-10-26 16:52:59. 057 / Thread - 4 INFO C.A.O tter. Canal. Adapter. The launcher. Loader. AdapterProcessor - = = = = = = = = = = = = = > Start to connect destination: Example < = = = = = = = = = = = = = 2020-10-26 16:52:59. [the main] 100 INFO org. Apache. Coyote. Http11. Http11NioProtocol - Starting ProtocolHandler [HTTP - nio - 8081 ""] 16:52:59. 2020-10-26, 153 [main] INFO org.apache.tomcat.util.net.NioSelectorPool - Using A shared selector for servlet write/read 2020-10-26 16:52:59.590 [main] INFO o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8081 (HTTP) with context path '' 2020-10-26 16:52:59.626 [main] INFO C.A.O tter. Canal. Adapter. The launcher. CanalAdapterApplication - Started CanalAdapterApplication in 31.278 seconds (JVM [Thread running 33.99) for the 2020-10-26 16:52:59. 930-4] INFO C.A.O tter. Canal. Adapter. The launcher. The loader. AdapterProcessor - =============> Subscribe destination: example succeed <=============Copy the code
  • If you need to stopcanal-adapterThe service can use the following command.
sh bin/stop.sh

Copy the code

Data synchronization Demo

After the above steps, Canal’s data synchronization function is almost available. Let’s demonstrate the data synchronization function.

  • The first thing we need to do isElasticsearchCreate index, andMySQLIn theproductThe table corresponds directly toKibana的Dev ToolsUse the following command to create the.
PUT canal_product
{
  "mappings": {
    "properties": {
      "title": {
        "type": "text"
      },
      "sub_title": {
        "type": "text"
      },
      "pic": {
        "type": "text"
      },
      "price": {
        "type": "double"
      }
    }
  }
}

Copy the code

  • After the index is created, you can view the index structure.

  • Then create a record in the database using the following SQL statement;

INSERT INTO product (id, title, sub_title, price, PIC) VALUES (5, 'mi8 ',' 2019.00 ', NULLCopy the code
  • After the creation, you can search for Elasticsearch and find that the data has been synchronized.

  • Then use the following SQL to modify the data;

UPDATE product SET title=' xiaomi 'WHERE id=5Copy the code
  • After the data is modified, you can search for the Elasticsearch file and find that the data has been modified.

  • Then use the following SQL to delete the data;

DELETE FROM product WHERE id=5

Copy the code
  • MySQL > select * from Elasticsearch; select * from Elasticsearch;

    Canal-admin upload the compressed package canal.admin-1.1.5-snapshot.tar. gz to the Linux server and decompress it to the specified directory /mydata/canal-admin. After decompression, the directory structure is as follows.

├ ─ ─ bin │ ├ ─ ─ restart. Sh │ ├ ─ ─ startup. The bat │ ├ ─ ─ startup. Sh │ └ ─ ─ stop. Sh ├ ─ ─ the conf │ ├ ─ ─ application. Yml │ ├ ─ ─ Canal_manager. SQL │ ├ ─ ─ canal - the template. The properties │ ├ ─ ─ the instance - the template. The properties │ ├ ─ ─ logback. XML │ └ ─ ─ public │ ├ ─ ─ avatar. GIF │ ├ ─ ─ index. The HTML │ ├ ─ ─ logo. The PNG │ └ ─ ─ the static ├ ─ ─ lib └ ─ ─ logsCopy the code
  • Create database canal_manager for canal-admin. Create SQL script /mydata/canal-admin/conf/canal_manager. SQL. The following table will be created.

  • To modify the configuration file conf/application.yml, perform the following steps, mainly to modify the data source configuration and the canal-admin management account configuration. Note that a database account with read and write permission must be used, for example, the management account root:root.

server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 spring.datasource: address: 127.0.0.1:3306 database: canal_manager username: root password: root driver-class-name: com.mysql.jdbc.driver URL: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}? useUnicode=true&characterEncoding=UTF-8&useSSL=false hikari: maximum-pool-size: 30 minimum-idle: 1 canal: adminUser: admin adminPasswd: adminCopy the code
  • I’m going to go ahead and do thatCanal - server conf/canal_local. The propertiesFiles for configuration, mainly modificationcanal-adminTo be used after the modification is completesh bin/startup.sh localrestartcanal-server:
Register IP canal.register. IP = # canal admin config canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 # admin auto register canal.admin.register.auto = true canal.admin.register.cluster =Copy the code
  • usestartup.shThe script to startcanal-adminService;
sh bin/startup.sh

Copy the code
  • After the service is successfully started, run the following command to view service logs.
tail -f logs/admin.log

Copy the code
The 020-10-27 10:15:04. 210. [the main] INFO org. Apache. Coyote. Http11. Http11NioProtocol - Starting ProtocolHandler [HTTP - nio - 8089 ""] 10:15:04. 2020-10-27, 308 [main] INFO org.apache.tomcat.util.net.NioSelectorPool - Using a Shared The selector for the servlet write/read the 2020-10-27 10:15:04. 534. [the main] INFO O.S.B oot. Web. Embedded. Tomcat. TomcatWebServer - Tomcat started on port(s): 8089 (HTTP) with the context path '10:15:04. 2020-10-27 573 [main] INFO com. Alibaba. Otter. Canal. Admin. CanalAdminApplication - Started CanalAdminApplication in 31.203 seconds (JVM running for 34.865)Copy the code
  • Access to the canal – admin Web interface, input password is admin: 123456 can log in, visit the address: http://192.168.3.101:8089

  • After successful login, you can use the Web UI to operate the Canal-server.

The resources

Canal official documentation: github.com/alibaba/can…

The last

If you like the article, you can click a “like”. Finally, we will recommend a high-quality technology-related article every day, mainly sharing Java related technology and interview skills, learning Java without getting lost.