WX search [programmer internal point matter], reply [666] wonderful.

What is Canal?

Canal is a framework developed by Ali that provides incremental data subscription and consumption based on database incremental log parsing. The whole framework is purely JAVA developed, and currently only supports Mysql and MariaDB (similar to Mysql).

So what is database incremental logging?

There are many types of MySQL logs, including error logs, query logs, slow query logs, transaction logs, and binary logs. Data changes made by the MySQL database (Data Manipulation Language (DML)) are stored in the form of binary logs.

The canal principle

Before introducing the canal principle, let’s review the principle of MySQL master-slave synchronization, which may give you a better understanding of how Canal works.

MySQL master-slave synchronization

MySQL master-slave synchronization is also called read/write separation. It improves the load and fault tolerance of the database and makes the database highly available

MySQL master-slave synchronization schematic

The above pictures originated from the network, if there is infringement contact delete

Master node operations:

Binary log events are written to the binary log when changes are made to the master node (delete, update, insert, create functions, stored procedures, etc.).

show binlog events 
Copy the code

These events are written sequentially to the bin log. When the slave node connects to the master node, the master node starts the binlog dump thread for the slave node.

When the binlog of the master node changes, the bin logdump thread notifies the slave node of the available binlog and sends the corresponding binlog content to the slave node.

Slave Node operation process:

Two threads are created on the slave node: one I/O thread and one SQL thread. The I/O thread connects to the master node, and the binlog dump thread on the master node sends the contents of the binlog to the I/O thread.

After receiving the binlog content, the I/O thread writes the content to the local relay log. The SQL thread reads the RALay log written by the I/O thread and writes the contents of the relay log to the slave database.


2. Canal principle

Understanding the above MySQL master-slave synchronization principle, canal’s working mechanism is easy to understand. In fact, Canal simulates the interaction protocol between the slave node and the master node in the MySQL database, disguises itself as the MySQL slave node and sends a message to the MySQL master nodeDump agreementMySQL master node receives dump request and starts pushing binary log to slave node (i.ecanal).

The above pictures originated from the network, if there is infringement contact delete

All talk, no practice, do it!

Canal implements “monitoring” MySQL

Before we write the code, we will first modify MySQL, install MySQL will not go into detail, basic operation.

1, check whether MySQL has binary log enabled

show binary logs 
Copy the code

If it is not enabled as shown in the picture, ordinary users do not have this command permission, but I do, TSK TSK!If you do not need to manually open, and inmy.cnfConfiguration in filebinlog-formatRowmodel

log-bin=mysq-bin
binlog-format=Row
Copy the code

Log-bin Indicates the location where the binlog file is stored. Binlog-format Sets the log-bin replication mode for MySQL

There are three types of MySQL replication:

Statement-based Replication (SBR)

  • Advantages: The SQL modified data is saved in the binlog, which does not need to record every SQL and data changes. The binlog volume is small, and the I/O overhead is low, and the performance is good
  • Disadvantages: Data inconsistency in master-slave may occur

Row-based replication (RBR)

  • Advantages: Do not record the context information of each SQL statement, only record which data was modified, and how
  • Disadvantages: Large binlog volume, especially in alter table attributes, will generate a large amount of binlog data

Mixed-mode Replication (MBR)

  • A binlog can be in three formats: STATEMENT, ROW, and MIXED.

Create a user for Canal that has permission to operate MySQL

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@The '%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@The '%' ;
FLUSH PRIVILEGES;
Copy the code

3. Install Canal

Download address: https://github.com/alibaba/canal/releases

Select the version such as canal.deployer-xxx.tar.gz

4. Configure canal

To modify the instance.properties file, you need to add rules to listen on the database and tables. Canal can listen on the entire database or a certain table, which is flexible.

vim conf/example/instance.properties
Copy the code
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # ### mysql serverId
canal.instance.mysql.slaveId = 2020

# position info change the location of the canal databasecanal.instance.master.address = 127.0. 01.:3306 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp =  #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp =  # username/password change to your own database information account (a separate account created in preparation stage)canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.defaultDatabaseName = canal.instance.connectionCharset = UTF- 8 -  Table regex# canal.instance.filter.regex = blogs\.blog_info canal.instance.filter.regex = .\*\\\\.. A \ *# table black regex canal.instance.filter.black.regex = Copy the code

Start the canal

sh bin/startup.sh
Copy the code

Take a look at the server log to verify that Canal is properly started

vi logs/canal/canal.log
Copy the code

If canal Server is running Now is displayed, the system succeeds

2020- 01- 08 15:25:33.361 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ##    start the canal server.
2020- 01- 08 15:25:33.468 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.16812.245.:11111]
2020- 01- 08 15:25:34.061 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
Copy the code

5. Write Java client code to realize Canal monitoring

Importing dependency packages

<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.client</artifactId>
  <version>1.1.0</version>
</dependency>
Copy the code

This is just a simple implementation

public class MainApp {

    public static void main(String... args) throws Exception {

        / * ** create and* /  CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),  11111), "example".""."");   int batchSize = 1000;  int emptyCount = 0;  try {  connector.connect();  / * ** Monitor all tables in the database* /  connector.subscribe(". * \ \.. *");  / * ** Specify the table, library name to monitor. The name of the table* /  //connector.subscribe("xin-master.jk_order");  connector.rollback();   // No heartbeat detected after 120  int totalEmptyCount = 120;  while (emptyCount < totalEmptyCount) {  Message message = connector.getWithoutAck(batchSize); // Get the specified amount of data  long batchId = message.getId();  int size = message.getEntries().size();  if (batchId == -1 || size == 0) {  emptyCount++;  System.out.println("empty count : " + emptyCount);  try {  Thread.sleep(1000);  } catch (InterruptedException e) {  }  } else {  emptyCount = 0;  // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);  printEntry(message.getEntries());  }  / * ** Submit confirmation* /  connector.ack(batchId);  / * ** Failed to process, rollback data* /  connector.rollback(batchId);  }   System.out.println("empty too many times, exit");  } finally {  connector.disconnect();  / * ** Manually enable transaction rollback* /  //TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();  }  }   private static void printEntry(List<CanalEntry.Entry> entrys) {   for (CanalEntry.Entry entry : entrys) {   if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry  .EntryType  .TRANSACTIONEND) {  continue;  }   CanalEntry.RowChange rowChage = null;  try {  rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());  } catch (Exception e) {  throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),  e);  }   CanalEntry.EventType eventType = rowChage.getEventType();  System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s". entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),  entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),  eventType));   for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {  if (eventType == CanalEntry.EventType.DELETE) {  printColumn(rowData.getBeforeColumnsList());  } else if (eventType == CanalEntry.EventType.INSERT) {  printColumn(rowData.getAfterColumnsList());  } else {  System.out.println("-------> before");  printColumn(rowData.getBeforeColumnsList());  System.out.println("-------> after");  printColumn(rowData.getAfterColumnsList());  }  }  }  }   private static void printColumn(List<CanalEntry.Column> columns) {  for (CanalEntry.Column column : columns) {  System.out.println(column.getName() + ":" + column.getValue() + " update=" + column.getUpdated());  }  } } Copy the code

So now that we’re done writing the code, let’s start the service and see what happens, because we’re not working on the database, so the listening is empty.Next we execute one in the databaseupdateTry statement

update jk_orderset order_no = '1111'  where id = 40
Copy the code

The console detects database changes and generates a binlog log filemysql-bin.000009:3830 How to use the generated binlog file and parse it into SQl statements?

<! Mysql > select * from binlog;        <dependency>
            <groupId>com.github.shyiko</groupId>
            <artifactId>mysql-binlog-connector-java</artifactId>
 <version>0.13.0</version> </dependency> Copy the code

Download the binlog file and test it locally

 public static void main(String[] args) throws IOException {
        String filePath = "C: \ \ ProgramData \ \ MySQL \ \ MySQL Server 5.7 \ \ Data \ \ MySQL - bin. 000009:3830";
        File binlogFile = new File(filePath);
        EventDeserializer eventDeserializer = new EventDeserializer();
        eventDeserializer.setChecksumType(ChecksumType.CRC32);
 BinaryLogFileReader reader = new BinaryLogFileReader(binlogFile, eventDeserializer);  try {  for(Event event; (event = reader.readEvent()) ! =null;) { System.out.println(event.toString());  }  } finally {  reader.close();  }  } Copy the code

The last operation to the database was to add an idx_index index

Event{header=EventHeaderV4{timestamp=1551325542000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=8455, flags=0}, data=null}
Event{header=EventHeaderV4{timestamp=1551325542000, eventType=QUERY, serverId=1, headerLength=19, dataLength=190, nextPosition=8664, flags=0}, data=QueryEventData{threadId=25, executionTime=0, errorCode=0, database='xin-master', sql='ALTER TABLE `jk_order`
DROP INDEX `idx_index` ,
ADD INDEX `idx_index` (`user_id`, `service_id`, `real_price`) USING BTREE'}}
Event{header=EventHeaderV4{timestamp=1551438586000, eventType=STOP, serverId=1, headerLength=19, dataLength=4, nextPosition=8687, flags=0}, data=null}
Copy the code

So now that we’ve implemented monitoring MySQL,

4. Canal application scenario

The application scenarios of Canal are as follows:

  • Resolve MySQL master/slave synchronization delay
  • Realize real-time database backup
  • Multilevel index (seller and buyer’s index)
  • The service cache is refreshed
  • Price changes and other important business news

How does Canal solve the problem of MySQL master-slave synchronization delay

The maser-slave mode of MySQL is common in the production environment. However, synchronization delays may occur in clusters deployed across equipment rooms. Here’s an example:

An order status is not paid, the master node changes to paid, but due to some reasons there is a delay in the data cannot be timely synchronized to the slave, then the user immediately check the order status (query slave) display is still not paid, which user should not panic when seeing this situation.

Why is there a master-slave synchronization delay?

When the TPS concurrency of the master library is high, the master node concurrently generates the modification operation, while the SQL thread of the slave node is a single thread processing the synchronous data, so the delay naturally occurs.

We use Canal to monitor data updates of maser node in real time (it can be monitored for a certain table). Canal captures the changed SQL and executes it on the slave node immediately, so as to solve the master-slave delay problem.

However, there are more reasons for master-slave synchronization. Because the master-slave server is cross-machine and cross-machine room, in addition to network bandwidth, network stability and synchronization between machines are the main reasons for master-slave synchronization.

conclusion

This paper is just a simple implementation of canal monitor database function, aiming to provide you with a solution to the problem, or repeated that sentence, there are many technical methods to solve the problem, the specific application needs to be combined with the specific business.

Sorted out hundreds of various kinds of technical e-books and video materials, hush ~, free to send, public number reply [666] to receive. I set up a technology exchange group with my friends to discuss technology and share technical information, aiming to learn and progress together. If you are interested, join us!