“Canal is very versatile and very easy to use, which is something that people might need when they need to fulfill the company’s needs.”

Here’s an example:

Several developers in the company are currently developing a set of services, in order to shorten the call delay, part of the interface data is added to the cache. Once the data has been updated in the database, the cache becomes old and must be deleted in time.

Deleting the cache is “naturally possible in the business code for updating data,” but sometimes it’s in another project code that you don’t have the right to change, or someone else doesn’t want to write code outside of the business code. (There are all sorts of coordination issues that arise when people collaborate). Or simply deleting the cache fails and the cache is still old.

As mentioned in the previous article, we can completely separate the cache update operation and form a separate system. “Canal is such a good helper.” Can help us implement a system like the one below:


“The main points of this article are as follows:”

  • What is the Canal
  • Working principle of Canal
  • Read and write separation of the database
  • Primary/secondary database synchronization
  • Primary/secondary database synchronization consistency problem Asynchronous replication Full synchronous replication Semi-synchronous replication
  • Configure the Canal service. Run the Java client Demo of the Canal service

Recently organized a summary of information, information covering the first line of large factory Java interview questions summary + each knowledge point learning thinking guide + a 300 page PDF document Java core knowledge points summary!

Partners who want to receive the PDF can get the document for free by scanning the image below



Ali open source MySQL middleware Canal quick start

What is the Canal

As is known to all, Ali is an Internet enterprise that uses MySQL extensively early in China (de-IOE: To get rid of the IBM minicomputers, Oracle database, EMC storage device, instead of their own on the basis of open source software development system), and based on alibaba/taobao business, since 2010, business try the database log parsing for incremental change synchronously, thus derived a lot of incremental database subscription and consumer business.

Canal came into being. It reads binlogs from the master database through a slave database disguised as a database to realize “database incremental subscription and consumption business needs”.

“Canal purpose:”

  • Database mirroring
  • Real-time Database backup
  • Index building and real-time maintenance (split heterogeneous index, inverted index, etc.)
  • Service Cache Refresh
  • Incremental data processing with business logic

Open Source Project Address:

https://github.com/alibaba/canal

Instead of copying the project profile, here are a few noteworthy points:

  • Canal uses client-server mode, data transfer protocol using Protobuf 3.0 (many RPC frameworks also use such as gRPC)
  • Current canal supports source MySQL versions including 5.1.x, 5.5.x, 5.6.x, 5.7.x, 8.0.x
  • Canal, as a MySQL binlog incremental fetching and parsing tool, posts change records to MQ systems such as Kafka/RocketMQ.

Working principle of Canal

Canal is actually a slave library disguised as a database to read the Binlog. Let’s take a look at the basics of “MySQL database master/slave” so that we can understand Canal more quickly.

Read and write separation of the database

In order to cope with high concurrency, MySQL supports a database host to be divided into a single primary write library (mainly responsible for write operations), and the read database pressure is allocated to the read secondary library, and the read secondary library can be multiple, which is a typical scenario of read and write separation.

Primary/secondary database synchronization

The read-write separation of the database is realized by synchronizing the master database with the slave database and letting the slave database listen to the master database Binlog. The general process is as follows:

MySQL master writes data changes to binary log (binary log events)

MySQL slave copies master binary log events to its relay log

MySQL slave Events in the Chongqing relay log that reflect data changes to its own data

The principle of master-slave synchronization is not discussed in detail here.

As you can see, there is a problem with this architecture: “There is a delay in the master/slave synchronization of the database, so there is a short period of time when the data from the master/slave database is inconsistent.”

Most of the time this discrepancy is very brief, and most of the time we can ignore it.

But once the data are consistent, it leads to thinking about how to solve the problem.

Database primary/secondary synchronization consistency problem

We usually use MySQL master-slave replication to solve the single point of failure of MySQL. It synchronizes the changes of the primary database to the secondary database by logical replication. The strict consistency between the primary database and the secondary database cannot be guaranteed.

Therefore, the master-slave replication of MySQL causes the problem of master-slave “data consistency”. MySQL supports asynchronous replication, semi-synchronous replication, and full synchronous replication.

Asynchronous replication

MySQL asynchronous replication, copy the default is the main repository of the execution of the client to submit immediately after the transaction will result returns to the client, don’t care whether from library has been receiving and processing, so there will be a problem, “the Lord if the crash out as the master committed transaction may not have reached from the library, if at this point, the force will be from ascension is given priority to, May result in incomplete data on the new host.”

The master library writes the transaction Binlog event to the Binlog file. The master library simply notifies the Dump thread to send the new Binlog, and the master library continues to process the commit without ensuring that the Binlog is delivered to any slave node.

Full synchronous replication

A transaction is returned to the client after the primary library has completed the execution of the transaction by all secondary libraries. The performance of fully synchronous replication must be severely affected “because the transaction must wait for all slave libraries to complete before returning”.

After the master library commits the transaction, all slave library nodes must receive, APPLY, and commit the transaction before the master library thread can proceed. The disadvantage is that the main library takes longer to complete a transaction, which degrades performance.

Semi-synchronous replication

The master database only needs to wait for at least one slave database to receive it and Flush binlogs to the Relay Log file. The master database does not need to wait for all slave databases to send feedback to the master database. At the same time, “this is only feedback received, not fully completed and submitted,” which saves a lot of time.

Between asynchronous replication and synchronous replication, the master library does not immediately return to the client after executing the transaction committed by the client, but waits for at least one library to receive and write to the relay log before returning to the client. Semi-synchronous replication improves data security compared to asynchronous replication, “but it also imposes a delay of at least one TCP/IP round trip. Therefore, semi-synchronous replication is best used on low-latency networks.”

“In fact, semi-synchronous replication is not strictly semi-synchronous replication. In the MySQL semi-synchronous replication architecture, when the master database is waiting for the standby DATABASE ACK, if the timeout will degenerate into asynchronous, it may also cause” data inconsistency “.

When the semi-synchronous replication times out (controlled by rpl_semi_synC_master_timeout, in milliseconds, the default value is 10000, that is, 10s), the semi-synchronous replication is temporarily disabled and asynchronous replication is used. After the master dump thread has sent all the events of a transaction, if a response from the slave library is received in rpl_semi_SYNC_master_TIMEOUT, the master/slave replication resumes to semi-synchronous replication.

A detailed analysis of the principle of semi-synchronous replication can be seen in this extended article, which will not be expanded here:

https://www.cnblogs.com/ivictor/p/5735580.html

Back to how Canal works

By reviewing the principle of data synchronization in the slave database, Canal is very simple to understand.

  • Canal emulated the interaction protocol of the MySQL slave, disguised itself as the MySQL slave, and sent the dump protocol to the MySQL master
  • MySQL master receives dump request and starts pushing binary log to slave
  • Canal parses binary log objects (originally byte streams)

Canal of actual combat

Open the MySQL Binlog

This step has already been mentioned in the previous article that you can use Binlog to recover MySQL data.

First go to the database console and run the command:

mysql> show variables like'log_bin%';
+---------------------------------+-------+
| Variable_name                   | Value |
+---------------------------------+-------+
| log_bin                         | OFF   |
| log_bin_basename                |       |
| log_bin_index                   |       |
| log_bin_trust_function_creators | OFF   |
| log_bin_use_v1_row_events       | OFF   |
+---------------------------------+-------+
5 rows in set (0.00 sec)Copy the code

You can see that our binlog is OFF, it’s all OFF. Next we need to modify the Mysql configuration file by executing the following command:

sudo vi /etc/mysql/mysql.conf.d/mysqld.cnfCopy the code

At the end of the file add:

log-bin=/var/lib/mysql/mysql-bin
binlog-format=ROWCopy the code

Save the file and restart the mysql service:

sudo service mysql restartCopy the code

After the restart is complete, check the mysql status:

systemctl status mysql.serviceCopy the code

Recently organized a summary of information, information covering the first line of large factory Java interview questions summary + each knowledge point learning thinking guide + a 300 page PDF document Java core knowledge points summary!

Partners who want to receive the PDF can get the document for free by scanning the image below





If you are running mysql 5.7 or higher, you will get an error:

Jan 06 15:49:58 VM-0-11-Ubuntu mysqld[5930]: 2020-01-06T07:49:58.190791z 0 [Warning] Changed limits: MAX_open_files: 1024 (requested 5000) Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.190839z 0 [Warning] Changed limits: 431 (requested 2000) Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.359713z 0 [Warning] TIMESTAMP with implicit DEFAULT value is deprecated. Please use --explicit_defaults_for_timestamp server option (se Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: [Note] /usr/sbin/mysqld (mysqld 5.7.28-0ubuntu0.16.04.2-log) Jan 06 15:49:58 VM-0-11- Ubuntu mysqld[5930]: 2020-01-06t07:49:58.363017z 0 [ERROR] You have enabled the binarylog, but you haven't provided the mandatory server-id. Please refer to the proper server Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: [ERROR] Aborting Jan 06 15:49:58 VM-0-11-Ubuntu mysqld[5930]: 2020-01-06T07:49:58.363922z 0 [Note] Binlog End Jan 06 15:49:58 VM-0-11 ubuntu mysqld[5930]: [Note] /usr/sbin/mysqld: Shutdown complete Jan 06 15:49:58VM-0-11-Ubuntu Systemd [1]: mysql.service: Main process exited, code=exited, status=1/FAILURECopy the code

“You have enabled the binary log, but You haven’t provided the mandatory server-ID. Please refer to the proper server”

Our previous configuration should work for versions 5.7 and below. But for higher versions, we need to specify server-ID.

We specify this MySQL as 2 (as long as it does not duplicate the other library ids) :

server-id=2Copy the code

Create database Canal use account

mysql> select user, host from user;
+------------------+-----------+
| user             | host      |
+------------------+-----------+
| root             | %         |
| debian-sys-maint | localhost |
| mysql.session    | localhost |
| mysql.sys        | localhost |
| root             | localhost |
+------------------+-----------+
5 rows in setCopy the code
CREATE USER canal IDENTIFIED BY 'xxxx'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO'canal'@The '%';  
FLUSH PRIVILEGES;  

show grants for 'canal' Copy the code

Configuring the Canal Service

Go to Github to download the latest Stable Canal package:

  • https://github.com/alibaba/canal/releases

Extract:

mkdir /tmp/canal
tar zxvf canal.deployer-$version.tar.gz  -C /tmp/canalCopy the code

Configuration file Settings:

There are two main configuration file, one is the conf/canal. The properties is a conf/example/instance properties.

In order to quickly run the Demo, only change the conf/example/instance. The properties of the database connection account password

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=xxxxxxx
canal.instance.connectionCharset = UTF-8Copy the code

Running the Canal service

Make sure you have the JDK on your machine and then run the Canal startup script:

sh bin/startup.shCopy the code

The following figure shows the successful operation:

Java client code

I wrote the following client code in the Code repository (MIaosha-Job) of the Seckill system series

Warehouse source address: https://github.com/qqxx6661/miaosha

package job; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.*; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.InvalidProtocolBufferException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; public class CanalClient { private static final Logger LOGGER = LoggerFactory.getLogger(CanalClient.class); Public static void main(String[] args) {// Connect with canal CanalConnector connector = CanalConnectors. NewSingleConnector (new InetSocketAddress ("127.0.0.1", 11111),
                "example"."".""); connector.connect(); // Step 2: Open connector.subscribe(); // Step 3: Repeat the subscriptionwhile (trueMessage = connector.getwithoutack (1000); long batchID = message.getId(); int size = message.getEntries().size();if (batchID == -1 || size == 0) {
                    LOGGER.info("Currently no data, sleep for 1 second");
                    Thread.sleep(1000);
                } else {
                    LOGGER.info("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - to have the data -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --");
                    printEntry(message.getEntries());
                }

                connector.ack(batchID);

            } catch (Exception e) {
                LOGGER.error("Processing error"); } finally { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }}}} /** * Get each printed record */ public static voidprintEntry(List<Entry> entrys) {

        forHeader Header = entry.getheader (); EntryType entryType = entry.getEntryType(); // Step 2: If the current is RowData, that is the data I needif (entryType == EntryType.ROWDATA) {

                String tableName = header.getTableName();
                String schemaName = header.getSchemaName();

                RowChange rowChange = null;

                try {
                    rowChange = RowChange.parseFrom(entry.getStoreValue());
                } catch (InvalidProtocolBufferException e) {
                    e.printStackTrace();
                }

                EventType eventType = rowChange.getEventType();

                LOGGER.info(String.format("Currently operating on table %s.%s, perform operation = %s", schemaName, tableName, eventType)); // If the operation is' query 'or' DDL ', then the SQL is typed directlyif (eventType == EventType.QUERY || rowChange.getIsDdl()) {
                    LOGGER.info(Query statement [{}], rowChange.getSql());
                    return; } // Step 3: Track the level of the columns rowChange. GetRowDatasList () forEach ((rowData) - > {/ / for the column before updating the List < column > beforeColumns = rowData.getBeforeColumnsList(); / / get updates after the column of the List < column > afterColumns = rowData. GetAfterColumnsList (); // The current operation is deleteif (eventType == EventType.DELETE) {
                        printColumn(beforeColumns); } // This is an insert operationif (eventType == EventType.INSERT) {
                        printColumn(afterColumns); } // This is an update operationif (eventType == EventType.UPDATE) {
                        printColumn(afterColumns); DeleteCache (afterColumns, tableName, schemaName); }}); }}} /** ** @param columns */ public static voidprintColumn(List<Column> columns) { columns.forEach((column) -> { String columnName = column.getName(); String columnValue = column.getValue(); String columnType = column.getMysqlType(); Boolean isUpdated = column.getupdated (); LOGGER.info(String.format(ColumnName =% S, columnValue=% S, columnType=%s, isUpdated=%s", columnName, columnValue, columnType, isUpdated)); }); } /** * public static void deleteCache(List<Column> columns, String tableName, String schemaName) {public static void deleteCache(List<Column> columns, String tableName, String schemaName) {if ("stock".equals(tableName) && "m4a_miaosha".equals(schemaName)) {
            AtomicInteger id = new AtomicInteger();
            columns.forEach((column) -> {
                String columnName = column.getName();
                String columnValue = column.getValue();
                if ("id".equals(columnName)) { id.set(Integer.parseInt(columnValue)); }}); // TODO: delete cache logger.info (Canal drop stock cache for id: [{}], id); }}}Copy the code

There are detailed comments in the code, so I won’t explain them.

We run the code, and then we UPDATE the database to change the outlaw threes to threes, and then back to threes, as shown below.

Canal successfully received two update operations:

Next, we simulate a Cache deletion service, in the code:

Public static void deleteCache(List<Column> columns, String tableName, String schemaName) {public static void deleteCache(List<Column> columns, String tableName, String schemaName) {if ("stock".equals(tableName) && "m4a_miaosha".equals(schemaName)) {
        AtomicInteger id = new AtomicInteger();
        columns.forEach((column) -> {
            String columnName = column.getName();
            String columnValue = column.getValue();
            if ("id".equals(columnName)) { id.set(Integer.parseInt(columnValue)); }}); // TODO: delete cache logger.info (Canal drop stock cache for id: [{}], id); }}Copy the code

“In the above code, we flush the inventory cache after receiving an update to the m4A_MIaosha.stock table. The effect is as follows:”

Simple Canal use is introduced here, the rest of the play space for readers.

conclusion

This article summarizes the basic principle and simple use of Canal.

“The following points are summarized:”

  • Canal disguised himself as a slave database to read the Binlog from the main database.
  • Canal has many uses, such as real-time database backup, index building and real-time maintenance (split heterogeneous index, inverted index, etc.), and service cache cache refresh.
  • Canal can push to a large number of data sources and supports push to message queues, making it easy to use in multiple languages.

Recently organized a summary of information, information covering the first line of large factory Java interview questions summary + each knowledge point learning thinking guide + a 300 page PDF document Java core knowledge points summary!


Partners who want to receive the PDF can get the document for free by scanning the image below





Big scene test

Core knowledge