Writing in the front

In today’s Internet industry, especially in the current distributed and micro-service development environment, in order to improve search efficiency and search accuracy, a large number of NoSQL databases such as Redis and Memcached are used, as well as a large number of full-text search services and search engines such as Solr and Elasticsearch are also used. So, at this time, there will be a problem we need to think about and solve: that is the problem of data synchronization! How to synchronize data from a real-time changing database to Redis/Memcached or Solr/Elasticsearch?

Data synchronization requirements in the context of the Internet

In today’s Internet industry, especially in today’s distributed and micro-service development environment, in order to improve search efficiency and search accuracy, a large number of NoSQL databases such as Redis and Memcached will be used, and a large number of full-text search services such as Solr and Elasticsearch will also be used. So, at this time, there will be a problem we need to think about and solve: that is the problem of data synchronization! How to synchronize data from a real-time changing database to Redis/Memcached or Solr/Elasticsearch?

For example, when we are writing data to a database in a distributed environment, we may need to read data from Redis, Memcached or Elasticsearch, Solr or other services. Then, the real-time synchronization of data between database and various services has become an urgent problem for us to solve.

Imagine we bring in services like Redis or Memcached or Elasticsearch or Solr due to business needs. This makes it possible for our application to read data from different services as shown in the figure below.

In essence, no matter what service or middleware we introduce, the data will ultimately be read from our MySQL database. So, how to synchronize data from MySQL to other services or middleware in real time?

Note: For better illustration, the following sections will take the example of data synchronization from the MySQL database to the Solr index library.

Data synchronization solution

1. Synchronize in business code

After adding, modifying, or deleting, execute the logical code that manipulates the Solr index library. Take the following code snippet for example.

public ResponseResult updateStatus(Long[] ids, String status){ try{ goodsService.updateStatus(ids, status); if("status_success".equals(status)){ List<TbItem> itemList = goodsService.getItemList(ids, status); itemSearchService.importList(itemList); Return new responseResult (true, "state failed ")}catch(Exception E){return new responseResult (false," state failed "); }}

Advantages:

Easy to operate.

Disadvantages:

High coupling degree of business.

Execution becomes less efficient.

2. Timed task synchronization

After the database is added, modified, and deleted, the database data is regularly synchronized to the Solr index database through the timing task.

Timed task technologies include Springtask, Quartz.

Ha ha, and my open source framework mykit – delay, open source address is: https://github.com/sunshinely… .

One trick to keep in mind when performing timed tasks here is: Regular tasks for the first time, from the MySQL database query time fields are arranged in reverse chronological order corresponding to the data, and record the current maximum of querying data of time field, every time after time tasks query data, as long as according to time order to query the data in the table of time field is greater than the last record of the time value of data, And record the maximum value of the time field queried by this task, so that there is no need to query all the data in the data table again.

Note: The time field mentioned here refers to the time field that identifies the data update. That is to say, when using timed tasks to synchronize data, it is best to add a time field to the data table to avoid performing a full table scan every time a task is executed.

Advantages:

The operation of synchronizing the Solr index library is completely decoupled from the business code.

Disadvantages:

The real-time performance of data is not high.

3. Synchronize with MQ

After the database is added, modified, or deleted, a message is sent to MQ. At this time, the synchronizer acts as the consumer in MQ, retrieving the message from the message queue, and then executing the logic to synchronize the Solr index library.

We can use the following diagram to simply illustrate the process of data synchronization through MQ.

We can do this with the following code.

public ResponseResult updateStatus(Long[] ids, String status){ try{ goodsService.updateStatus(ids, status); if("status_success".equals(status)){ List<TbItem> itemList = goodsService.getItemList(ids, status); final String jsonString = JSON.toJSONString(itemList); jmsTemplate.send(queueSolr, new MessageCreator(){ @Override public Message createMessage(Session session) throws JMSException{ return session.createTextMessage(jsonString); }}); } return new responseResult (true, "state changed successfully "); }catch(Exception E){return new responseResult (false, "state failed "); }}

Advantages:

Business code is decoupled and quasi-real-time.

Disadvantages:

You need to add code to send messages to MQ in your business code, and the data invocation interface is coupled.

4. Real-time synchronization via Canal

Canal is an open source database log incremental parsing component from Alibaba. Canal parses database log information to detect changes in table structure and data in the database to update the Solr index library.

Canal enabled complete decoupling of business code, complete decoupling of APIs, and quasi-real-time.

Introduction of Canal

Alibaba MySQL database BinLog incremental subscription and consumption component, based on the database incremental log parsing, provides incremental data subscription and consumption, currently mainly supports MySQL.

Canal open source address: https://github.com/alibaba/canal.

How Canal works

MySQL master slave replication implementation

As can be seen from the figure above, master-slave replication is mainly divided into three steps:

  • The Master node logs changes to the binary log (these records are called binary log events and can be viewed through Show Binlog Events).
  • The Slave node copies the binary log events of the Master node into its relay log.
  • The events in the Slave node’s redo relay log will be reflected in its own database.

Canal Internal Principles

First, let’s look at Canal’s schematic, as shown below.

The principle is roughly described as follows:

  • Canal emulates the MySQL slave interaction protocol, disguises itself as a MySQL slave and sends the dump protocol to the MySQL Master
  • MySQL Master receives dump request and starts pushing binary log to Slave (i.e. Canal)
  • Canal resolves the Binary Log object (originally a byte stream)

Canal Internal Structure

The explanation is as follows:

  • Server: Represents a Canal run instance, corresponding to a JVM process.
  • Instance: For a data queue (one Server corresponds to one or more instances).

Next, let’s look at the submodules under Instance, as shown below.

  • EventParser: Data source access, simulation Slave protocol and Master node interaction, protocol parsing.
  • EventSink: Connector between EventParser and EventStore that filters, processes, merges, and distributes data.
  • EventStore: Data storage.
  • MetaManager: Incremental subscription and consumption information management.

Canal Environmental Preparation

Set up MySQL remote access

grant all privileges on *.* to 'root'@'%' identified by '123456';
flush privileges;

MySQL configuration

Note: MySQL here is based on version 5.7.

Canal’s principle is based on MySQL’s binlog technology, so to use Canal, you need to enable MySQL’s binlog writing function. It is recommended that you configure the binlog mode as ROW.

To see the pattern of binlog, enter the following command on the MySQL command line.

SHOW VARIABLES LIKE 'binlog_format';

The effect is shown below.

As you can see, the default binlog format in MySQL is STATEMENT, so we need to change STATEMENT to ROW. Modify the /etc/my.cnf file.

vim /etc/my.cnf

Add the following three configurations under [mysqld].

Mysql > select ROW server_id from database and select database from database; select database from database and select database from database

After modifying the my. CNF file, you need to restart the MySQL service.

service mysqld restart

Next, let’s look at the binlog mode again.

SHOW VARIABLES LIKE 'binlog_format';

As you can see, MySQL’s binlog mode has already been set to ROW.

MySQL > create user authorization

Canal’s rationale is that the schema itself is MySQL Slave, so be sure to set permissions on MySQL Slave. Here, you need to create a master-slave synchronized account and give that account the relevant permissions.

CREATE USER canal@'localhost' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'localhost';
FLUSH PRIVILEGES;

Canal Deployment and Installation

Download the Canal

Here, we take the Canal version 1.1.1, friends can be downloaded to the link https://github.com/alibaba/canal/releases/tag/canal-1.1.1 Canal version 1.1.1.

Upload decompression

Download the Canal installation package, upload it to the server, and perform the following command to unzip it

Mkdir -p /usr/local/ Canal tar-zxvf can.deployer-1.1.1.tar.gz-c /usr/local/ Canal /

The unzipped directory is shown below.

The description of each catalog is as follows:

  • Bin: Stores executable scripts.
  • Conf: Stores configuration files.
  • Lib: holds other dependencies or third party libraries.
  • Logs: Holds log files.

Modifying configuration files

In the conf directory of Canal, there is a Canal. Properties file with configuration related to Canal Server, which contains the following line of configuration.

canal.destinations=example

Example is an example of Canal’s example. You can configure multiple instances by separating them with commas. Example is an example of Canal’s Instance. Also, the example here corresponds to a folder in the Canal conf directory. That is, each Instance in Canal corresponds to a subdirectory under the conf directory.

Next, we need to modify a configuration file in the example directory under the Canal conf directory, instance.properties.

vim instance.properties

Modify the following configuration items.

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # canal slaveId, note: Do not repeat canal with MySQL server_id. Instance. MySQL. SlaveId = 1234 # position info, Need to own the database information canal. Instance. Master. Address = 127.0.0.1:3306 canal. The instance. The master. The journal. The name = canal.instance.master.position = canal.instance.master.timestamp = #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = # username/password. Need to own the database information canal. Instance. DbUsername = canal canal. The instance. The dbPassword = canal canal. Instance. DefaultDatabaseName =canaldb canal.instance.connectionCharset = UTF-8 #table regex canal.instance.filter.regex = canaldb\\.. * # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

Meaning of the choice:

  • Canal. The instance. The mysql. SlaveId: serverId concept of mysql cluster configuration, need to make sure that and the current mysql cluster id only.
  • Canal. The instance. The master. Address: mysql main library link address;
  • Canal. Instance. DbUsername: mysql database account;
  • Canal. Instance. DbPassword: mysql database password;
  • Canal. Instance. DefaultDatabaseName: mysql link the default database;
  • Canal. Instance. ConnectionCharset: mysql data parsing code;
  • Canal. The instance. The filter. The regex: mysql data analytical focus on table, Perl regular expressions.

Start the Canal

After configuring Canal, you can start Canal. Go to the bin directory of Canal and start Canal by entering the following command.

./startup.sh

Test the Canal

Import and modify the source code

Here, we used Canal’s source code for testing, downloaded Canal’s source code and imported it into IDEA.

Next, we find the SimpleCanalClientTest class under Example to test. The source code for this class is shown below.

package com.alibaba.otter.canal.example; import java.net.InetSocketAddress; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.common.utils.AddressUtils; /** ** public class SimpleCanalClientTest ** @Author Jianghang 2013-4-15 04:19:20 * @Version 1.0.4 */ public class SimpleCanalClientTest extends AbstractCanalClientTest { public SimpleCanalClientTest(String destination){ super(destination); } public static void main(String args[]) {String destination = "example";} public static void main(String args[]) {String destination = "example"; String ip = AddressUtils.getHostIp(); CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress(ip, 11111), destination, "canal", "canal"); final SimpleCanalClientTest clientTest = new SimpleCanalClientTest(destination); clientTest.setConnector(connector); clientTest.start(); Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { try { logger.info("## stop the canal client"); clientTest.stop(); } catch (Throwable e) { logger.warn("##something goes wrong when stopping canal:", e); } finally { logger.info("## canal client is down."); }}}); }}

As you can see, Destination used in this class is Example. In this class, we just need to change the IP address to the Canal Server IP.

Specific as: will be the following line of code.

String ip = AddressUtils.getHostIp();

Is amended as:

String ip = "192.168.175.100"

Since we did not specify a user name and password when we configured Canal, we also need to pass the following code.

CanalConnector connector = CanalConnectors.newSingleConnector(
    new InetSocketAddress(ip, 11111),
    destination,
    "canal",
    "canal");

Is amended as:

CanalConnector connector = CanalConnectors.newSingleConnector(
    new InetSocketAddress(ip, 11111),
    destination,
    "",
    "");

When the changes are complete, run the main method to start the program.

Test data change

Next, create a CanalDB database in MySQL.

create database canaldb;

At this point, the associated log information is printed on the IDEA command line.

****************************************************
* Batch Id: [7] ,count : [3] , memsize : [149] , Time : 2020-08-05 23:25:35
* Start : [mysql-bin.000007:6180:1540286735000(2020-08-05 23:25:35)] 
* End : [mysql-bin.000007:6356:1540286735000(2020-08-05 23:25:35)] 
****************************************************

Next, I created a data table in CanalDB database, and added, deleted, modified and searched the data in the data table. The log information output by the program is shown as follows.

Mysql > display bin log for mysql mysql > change data **************************************************** * Batch Id: [7] ,count : [3] , memsize : [149] , Time : 2020-08-05 23:25:35 * Start : [mysql-bin.000007:6180:1540286735000(2020-08-05 23:25:35)] * End : [mysql-bin.000007:6356:1540286735000(2020-08-05 23:25:35)] **************************************************** ================> binlog[mysql-bin.000007:6180] , executeTime : 1540286735000(2020-08-05 23:25:35) , gtid : () , delay : 393ms BEGIN ----> Thread id: 43 ----------------> binlog[mysql-bin.000007:6311] , name[canal,canal_table] , eventType : DELETE , executeTime : 1540286735000(2020-08-05 23:25:35) , gtid : () , delay : 393 ms id : 8 type=int(10) unsigned name : 512 type=varchar(255) ---------------- END ----> transaction id: 249 ================> binlog[mysql-bin.000007:6356] , executeTime : 1540286735000(2020-08-05 23:25:35) , gtid : () , delay : 394ms **************************************************** * Batch Id: [8] ,count : [3] , memsize : [149] , Time : 2020-08-05 23:25:35 * Start : [mysql-bin.000007:6387:1540286869000(2020-08-05 23:25:49)] * End : [mysql-bin.000007:6563:1540286869000(2020-08-05 23:25:49)] **************************************************** ================> binlog[mysql-bin.000007:6387] , executeTime : 1540286869000(2020-08-05 23:25:49) , gtid : () , delay : 976ms BEGIN ----> Thread id: 43 ----------------> binlog[mysql-bin.000007:6518] , name[canal,canal_table] , eventType : INSERT , executeTime : 1540286869000(2020-08-05 23:25:49) , gtid : () , delay : 976 ms id : 21 type=int(10) unsigned update=true name : aaa type=varchar(255) update=true ---------------- END ----> transaction id: 250 ================> binlog[mysql-bin.000007:6563] , executeTime : 1540286869000(2020-08-05 23:25:49) , gtid : () , delay : 977ms **************************************************** * Batch Id: [9] ,count : [3] , memsize : [161] , Time : 2020-08-05 23:26:22 * Start : [mysql-bin.000007:6594:1540286902000(2020-08-05 23:26:22)] * End : [mysql-bin.000007:6782:1540286902000(2020-08-05 23:26:22)] **************************************************** ================> binlog[mysql-bin.000007:6594] , executeTime : 1540286902000(2020-08-05 23:26:22) , gtid : () , delay : 712ms BEGIN ----> Thread id: 43 ----------------> binlog[mysql-bin.000007:6725] , name[canal,canal_table] , eventType : UPDATE , executeTime : 1540286902000(2020-08-05 23:26:22) , gtid : () , delay : 712 ms id : 21 type=int(10) unsigned name : aaac type=varchar(255) update=true ---------------- END ----> transaction id: 252 ================> binlog[mysql-bin.000007:6782] , executeTime : 1540286902000(2020-08-05 23:26:22) , gtid : () , delay : 713ms

Data synchronization implementation

demand

Changes in database data are parsed by Canal to the Binlog log and updated in real time to Solr’s index library.

The specific implementation

Create a project

Create the Maven project mykit-canal-demo and add the following configuration in the pom.xml file.

<dependencies> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> < version > 1.0.24 < / version > < / dependency > < the dependency > < groupId > com. Alibaba. Otter < / groupId > < artifactId > canal. Protocol < / artifactId > < version > 1.0.24 < / version > < / dependency > < the dependency > <groupId> common-lang </groupId> </artifactId> common-lang </artifactId> <version>2.6</version> </dependency> <dependency> < the groupId > org. Codehaus. Jackson < / groupId > < artifactId > Jackson - mapper - asl < / artifactId > < version > 1.8.9 < / version > </dependency> <dependency> <groupId>org.apache.solr</groupId> <artifactId>solr-solrj</artifactId> <version>4.10.3</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> The < version > 4.9 < / version > < scope > test < / scope > < / dependency > < / dependencies >

Create the log4j configuration file XML

Create the log4j.properties file in the SRC /main/resources directory of the project, as shown below.

log4j.rootCategory=debug, CONSOLE # CONSOLE is set to be a ConsoleAppender using a PatternLayout. log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout Log4j. Appender. CONSOLE. Layout. ConversionPattern = % d {ISO8601} [% 15.15 t] % % - 6 r - 5 p % 30.30 x % m \ % n c # LOGFILE is set to be  a File appender using a PatternLayout. # log4j.appender.LOGFILE=org.apache.log4j.FileAppender # log4j.appender.LOGFILE.File=d:\axis.log # log4j.appender.LOGFILE.Append=true # log4j.appender.LOGFILE.layout=org.apache.log4j.PatternLayout # Log4j. Appender. LOGFILE. Layout. ConversionPattern = % d {ISO8601} [% 15.15 t] % % - 6 r c % x - 30.30-5 p % % m \ n

Create Entity Class

Create a Book entity class under the IO. Mykit.Canal. Demo. Bean package to test Canal’s data transfer, as shown below.

package io.mykit.canal.demo.bean;
import org.apache.solr.client.solrj.beans.Field;
import java.util.Date;
public class Book implements Serializable {
    private static final long serialVersionUID = -6350345408771427834L;{

    @Field("id")
    private Integer id;

    @Field("book_name")
    private String name;

    @Field("book_author")
    private String author;

    @Field("book_publishtime")
    private Date publishtime;

    @Field("book_price")
    private Double price;

    @Field("book_publishgroup")
    private String publishgroup;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getAuthor() {
        return author;
    }

    public void setAuthor(String author) {
        this.author = author;
    }

    public Date getPublishtime() {
        return publishtime;
    }

    public void setPublishtime(Date publishtime) {
        this.publishtime = publishtime;
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }

    public String getPublishgroup() {
        return publishgroup;
    }

    public void setPublishgroup(String publishgroup) {
        this.publishgroup = publishgroup;
    }

    @Override
    public String toString() {
        return "Book{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", author='" + author + '\'' +
                ", publishtime=" + publishtime +
                ", price=" + price +
                ", publishgroup='" + publishgroup + '\'' +
                '}';
    }
}

In the Book entity class, we use the Solr annotation @Field to define the relationship between the entity class fields and the Solr Field.

The realization of various tool classes

Next, we’re in IO. Mykit. Canal. Demo. Utils package created under a variety of tools.

  • BinlogValue

The code to store the values for each row and column of the BinLog analysis is shown below.

package io.mykit.canal.demo.utils; import java.io.Serializable; /** ** className: binlogValue <br/> ** binlog value for each row and column; <br> * Added data: beforeValue and value are both existing; <br> * modify data: beforeValue is the value before modification; Value is the modified value; <br> * Delete data: beforeValue and value are both the values before deletion; <br> */ public class BinlogValue implements Serializable {private static final Long {private static final Long; private static final Long {private static final Long; private static final Long; private static final Long; private static final Long; private static final Long serialVersionUID = -6350345408773943086L; private String value; private String beforeValue; /** * BinLog (); /** BinLog (); <br> * new data: value: existing value; <br> * modify data: value is the modified value; <br> * delete data: value is the value before delete; <br> */ public String getValue() {return value; } public void setValue(String value) { this.value = value; } /** * BinLog analyses the beforeValue value of each row and column; <br> * Added data: beforeValue is existing value; <br> * modify data: beforeValue is the value before modification; <br> * Delete data: beforeValue is the value before deletion; <br> */ public String getBeforeValue() { return beforeValue; } public void setBeforeValue(String beforeValue) { this.beforeValue = beforeValue; }}
  • CanalDataParser

To parse the data, the code is shown below.

package io.mykit.canal.demo.utils;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.CanalEntry.TransactionBegin;
import com.alibaba.otter.canal.protocol.CanalEntry.TransactionEnd;
import com.google.protobuf.InvalidProtocolBufferException;

/**
 * 解析数据
 */
public class CanalDataParser {
    
    protected static final String DATE_FORMAT   = "yyyy-MM-dd HH:mm:ss";
    protected static final String yyyyMMddHHmmss = "yyyyMMddHHmmss";
    protected static final String yyyyMMdd      = "yyyyMMdd";
    protected static final String SEP           = SystemUtils.LINE_SEPARATOR;
    protected static String  context_format     = null;
    protected static String  row_format         = null;
    protected static String  transaction_format = null;
    protected static String row_log = null;
    
    private static Logger logger = LoggerFactory.getLogger(CanalDataParser.class);
    
    static {
        context_format = SEP + "****************************************************" + SEP;
        context_format += "* Batch Id: [{}] ,count : [{}] , memsize : [{}] , Time : {}" + SEP;
        context_format += "* Start : [{}] " + SEP;
        context_format += "* End : [{}] " + SEP;
        context_format += "****************************************************" + SEP;

        row_format = SEP
                     + "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {} , delay : {}ms"
                     + SEP;

        transaction_format = SEP + "================> binlog[{}:{}] , executeTime : {} , delay : {}ms" + SEP;

        row_log = "schema[{}], table[{}]";
    }

    public static List<InnerBinlogEntry> convertToInnerBinlogEntry(Message message) {
        List<InnerBinlogEntry> innerBinlogEntryList = new ArrayList<InnerBinlogEntry>();
        
        if(message == null) {
            logger.info("接收到空的 message; 忽略");
            return innerBinlogEntryList;
        }
        
        long batchId = message.getId();
        int size = message.getEntries().size();
        if (batchId == -1 || size == 0) {
            logger.info("接收到空的message[size=" + size + "]; 忽略");
            return innerBinlogEntryList;
        }

        printLog(message, batchId, size);
        List<Entry> entrys = message.getEntries();

        //输出日志
        for (Entry entry : entrys) {
            long executeTime = entry.getHeader().getExecuteTime();
            long delayTime = new Date().getTime() - executeTime;
            
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) {
                    TransactionBegin begin = null;
                    try {
                        begin = TransactionBegin.parseFrom(entry.getStoreValue());
                    } catch (InvalidProtocolBufferException e) {
                        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                    }
                    // 打印事务头信息,执行的线程id,事务耗时
                    logger.info("BEGIN ----> Thread id: {}",  begin.getThreadId());
                    logger.info(transaction_format, new Object[] {entry.getHeader().getLogfileName(),
                                String.valueOf(entry.getHeader().getLogfileOffset()), String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) });

                } else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
                    TransactionEnd end = null;
                    try {
                        end = TransactionEnd.parseFrom(entry.getStoreValue());
                    } catch (InvalidProtocolBufferException e) {
                        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                    }
                    // 打印事务提交信息,事务id
                    logger.info("END ----> transaction id: {}", end.getTransactionId());
                    logger.info(transaction_format,
                        new Object[] {entry.getHeader().getLogfileName(),  String.valueOf(entry.getHeader().getLogfileOffset()),
                                String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) });
                }
                continue;
            }

            //解析结果
            if (entry.getEntryType() == EntryType.ROWDATA) {
                RowChange rowChage = null;
                try {
                    rowChage = RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                }

                EventType eventType = rowChage.getEventType();

                logger.info(row_format, new Object[] { entry.getHeader().getLogfileName(),
                            String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
                            entry.getHeader().getTableName(), eventType, String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) });

                //组装数据结果
                if (eventType == EventType.INSERT || eventType == EventType.DELETE || eventType == EventType.UPDATE) {
                    String schemaName = entry.getHeader().getSchemaName();
                    String tableName = entry.getHeader().getTableName();
                    List<Map<String, BinlogValue>> rows = parseEntry(entry);

                    InnerBinlogEntry innerBinlogEntry = new InnerBinlogEntry();
                    innerBinlogEntry.setEntry(entry);
                    innerBinlogEntry.setEventType(eventType);
                    innerBinlogEntry.setSchemaName(schemaName);
                    innerBinlogEntry.setTableName(tableName.toLowerCase());
                    innerBinlogEntry.setRows(rows);

                    innerBinlogEntryList.add(innerBinlogEntry);
                } else {
                    logger.info(" 存在 INSERT INSERT UPDATE 操作之外的SQL [" + eventType.toString() + "]");
                }
                continue;
            }
        }
        return innerBinlogEntryList;
    }

    private static List<Map<String, BinlogValue>> parseEntry(Entry entry) {
        List<Map<String, BinlogValue>> rows = new ArrayList<Map<String, BinlogValue>>();
        try {
            String schemaName = entry.getHeader().getSchemaName();
            String tableName = entry.getHeader().getTableName();
            RowChange rowChage = RowChange.parseFrom(entry.getStoreValue());
            EventType eventType = rowChage.getEventType();

            // 处理每个Entry中的每行数据
            for (RowData rowData : rowChage.getRowDatasList()) {
                StringBuilder rowlog = new StringBuilder("rowlog schema[" + schemaName + "], table[" + tableName + "], event[" + eventType.toString() + "]");
                
                Map<String, BinlogValue> row = new HashMap<String, BinlogValue>();
                List<Column> beforeColumns = rowData.getBeforeColumnsList();
                List<Column> afterColumns = rowData.getAfterColumnsList();
                beforeColumns = rowData.getBeforeColumnsList();
                if (eventType == EventType.DELETE) {//delete
                    for(Column column : beforeColumns) {
                        BinlogValue binlogValue = new BinlogValue();
                        binlogValue.setValue(column.getValue());
                        binlogValue.setBeforeValue(column.getValue());
                        row.put(column.getName(), binlogValue);
                    }
                } else if(eventType == EventType.UPDATE) {//update
                    for(Column column : beforeColumns) {
                        BinlogValue binlogValue = new BinlogValue();
                        binlogValue.setBeforeValue(column.getValue());
                        row.put(column.getName(), binlogValue);
                    }
                    for(Column column : afterColumns) {
                        BinlogValue binlogValue = row.get(column.getName());
                        if(binlogValue == null) {
                            binlogValue = new BinlogValue();
                        }
                        binlogValue.setValue(column.getValue());
                        row.put(column.getName(), binlogValue);
                    }
                } else { // insert
                    for(Column column : afterColumns) {
                        BinlogValue binlogValue = new BinlogValue();
                        binlogValue.setValue(column.getValue());
                        binlogValue.setBeforeValue(column.getValue());
                        row.put(column.getName(), binlogValue);
                    }
                } 
               
                rows.add(row);
                String rowjson = JacksonUtil.obj2str(row);
                
                logger.info("#################################### Data Parse Result ####################################");
                logger.info(rowlog + " , " + rowjson);
                logger.info("#################################### Data Parse Result ####################################");
                logger.info("");
            }
        } catch (InvalidProtocolBufferException e) {
            throw new RuntimeException("parseEntry has an error , data:" + entry.toString(), e);
        }
        return rows;
    }

    private static void printLog(Message message, long batchId, int size) {
        long memsize = 0;
        for (Entry entry : message.getEntries()) {
            memsize += entry.getHeader().getEventLength();
        }

        String startPosition = null;
        String endPosition = null;
        if (!CollectionUtils.isEmpty(message.getEntries())) {
            startPosition = buildPositionForDump(message.getEntries().get(0));
            endPosition = buildPositionForDump(message.getEntries().get(message.getEntries().size() - 1));
        }

        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
        logger.info(context_format, new Object[] {batchId, size, memsize, format.format(new Date()), startPosition, endPosition });
    }

    private static String buildPositionForDump(Entry entry) {
        long time = entry.getHeader().getExecuteTime();
        Date date = new Date(time);
        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
        return entry.getHeader().getLogfileName() + ":" + entry.getHeader().getLogfileOffset() + ":" + entry.getHeader().getExecuteTime() + "(" + format.format(date) + ")";
    }
}
  • DateUtils

Time tool class, the code is shown below.

package io.mykit.canal.demo.utils; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; public class DateUtils { private static final String FORMAT_PATTERN = "yyyy-MM-dd HH:mm:ss"; private static SimpleDateFormat sdf = new SimpleDateFormat(FORMAT_PATTERN); public static Date parseDate(String datetime) throws ParseException{ if(datetime ! = null && !" ".equals(datetime)){ return sdf.parse(datetime); } return null; } public static String formatDate(Date datetime) throws ParseException{ if(datetime ! = null ){ return sdf.format(datetime); } return null; } public static Long formatStringDateToLong(String datetime) throws ParseException{ if(datetime ! = null && !" ".equals(datetime)){ Date d = sdf.parse(datetime); return d.getTime(); } return null; } public static Long formatDateToLong(Date datetime) throws ParseException{ if(datetime ! = null){ return datetime.getTime(); } return null; }}
  • InnerBinlogEntry

BinLog entity class with the code shown below.

package io.mykit.canal.demo.utils; import java.util.ArrayList; import java.util.List; import java.util.Map; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; Public class InnerBinloGenTry {/** * Canal private Entry */ private Entry; /** * private String TableName; /** * private String TableName; Private String schemaName (); /** * private String schemaName (); /** * Canal Canal (); /** * Canal Canal (); EventType.INSERT; EventType.UPDATE; EventType.DELETE; */ private EventType eventType; private List<Map<String, BinlogValue>> rows = new ArrayList<Map<String, BinlogValue>>(); public Entry getEntry() { return entry; } public void setEntry(Entry entry) { this.entry = entry; } public String getTableName() { return tableName; } public void setTableName(String tableName) { this.tableName = tableName; } public EventType getEventType() { return eventType; } public void setEventType(EventType eventType) { this.eventType = eventType; } public String getSchemaName() { return schemaName; } public void setSchemaName(String schemaName) { this.schemaName = schemaName; } public List<Map<String, BinlogValue>> getRows() { return rows; } public void setRows(List<Map<String, BinlogValue>> rows) { this.rows = rows; }}
  • JacksonUtil

JSON utility class with the code shown below.

package io.mykit.canal.demo.utils; import java.io.IOException; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.JsonParseException; import org.codehaus.jackson.map.JsonMappingException; import org.codehaus.jackson.map.ObjectMapper; public class JacksonUtil { private static ObjectMapper mapper = new ObjectMapper(); public static String obj2str(Object obj) { String json = null; try { json = mapper.writeValueAsString(obj); } catch (JsonGenerationException e) { e.printStackTrace(); } catch (JsonMappingException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } return json; } public static <T> T str2obj(String content, Class<T> valueType) { try { return mapper.readValue(content, valueType); } catch (JsonParseException e) { e.printStackTrace(); } catch (JsonMappingException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } return null; }}

Synchronization program implementation

After the entity class and tool class are prepared, we can write a synchronization program to realize real-time synchronization of the data in the MySQL database to the Solr index library. The MyKitCanaldemosync class is commonly seen in the package io.mykit.canalcanal. Demo.

package io.mykit.canal.demo.main; import io.mykit.canal.demo.bean.Book; import io.mykit.canal.demo.utils.BinlogValue; import io.mykit.canal.demo.utils.CanalDataParser; import io.mykit.canal.demo.utils.DateUtils; import io.mykit.canal.demo.utils.InnerBinlogEntry; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import org.apache.solr.client.solrj.SolrServer; import org.apache.solr.client.solrj.impl.HttpSolrServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; import java.text.ParseException; import java.util.List; import java.util.Map; public class SyncDataBootStart { private static Logger logger = LoggerFactory.getLogger(SyncDataBootStart.class); Public static void main(String[] args) throws Exception {String hostname = "192.168.175.100"; Integer port = 11111; String destination = "example"; / / get CanalServer connection CanalConnector CanalConnector = CanalConnectors. NewSingleConnector (new InetSocketAddress (hostname, port), destination, "", ""); // Connect to CanalServer CanalConnector.connect (); // Subscribe to Destination CanalConnector.Subscribe (); // Integer BatchSize = 5*1024; while (true){ Message message = canalConnector.getWithoutAck(batchSize); long messageId = message.getId(); int size = message.getEntries().size(); if(messageId == -1 || size == 0){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }}else{List<InnerBinlogEntry> innerBinlogEntries = CanalDataParser.convertToInnerBinlogEntry(message); //2. Synchronize the parsed data information to the Solr index library. } // Submit confirmation canalConnector.ack(messageId); }} private static void SyncDataSolr (List<InnerBinlogEntry> innerbinlogEntries) throws Exception SolrServer SolrServer = new HttpSolrServer (" http://192.168.175.101:8080/solr "); If (innerbinLogEntries!), add, modify, or delete entries to a set of entries. If (innerBinlogEntries! = null){ for (InnerBinlogEntry innerBinlogEntry : innerBinlogEntries) { CanalEntry.EventType eventType = innerBinlogEntry.getEventType(); // If it is an Insert, update, Requires synchronization data to the library if solr index (eventType = = CanalEntry. EventType. INSERT | | eventType. = = CanalEntry eventType. UPDATE) { List<Map<String, BinlogValue>> rows = innerBinlogEntry.getRows(); if(rows ! = null){ for (Map<String, BinlogValue> row : rows) { BinlogValue id = row.get("id"); BinlogValue name = row.get("name"); BinlogValue author = row.get("author"); BinlogValue publishtime = row.get("publishtime"); BinlogValue price = row.get("price"); BinlogValue publishgroup = row.get("publishgroup"); Book book = new Book(); book.setId(Integer.parseInt(id.getValue())); book.setName(name.getValue()); book.setAuthor(author.getValue()); book.setPrice(Double.parseDouble(price.getValue())); book.setPublishgroup(publishgroup.getValue()); book.setPublishtime(DateUtils.parseDate(publishtime.getValue())); SolrServer.addBean (Book); solrServer.commit(); }}} else if (eventType = = CanalEntry. EventType. DELETE) {/ / if the DELETE operation, you need to DELETE the solr index data in a library. A List < Map < String, BinlogValue>> rows = innerBinlogEntry.getRows(); if(rows ! = null){ for (Map<String, BinlogValue> row : rows) { BinlogValue id = row.get("id"); SolrServer.deleteById (ID. GetValue ()); solrServer.commit(); } } } } } } }

Next, start the SyncDatabootStart class’s main method and listen for Canal Server, which in turn listens for MySQL binlog changes. SyncDatabootStart receives the change information immediately and parses it into a Book object to be updated to the Solr library in real time. If you delete data in the MySQL database, the data in the Solr library is also deleted in real time.

Some reference Canal official documentation: https://github.com/alibaba/canal.

Well, that’s it for today, I’m Glacier, see you next time ~~