Writing in the front

In today’s Internet industry, especially in the distributed and micro-service development environment, NoSQL databases such as Redis and Memcached are widely used to improve search efficiency and accuracy, as well as full-text search services and search engines such as Solr and Elasticsearch. So, at this time, there will be a problem we need to think and solve: that is the problem of data synchronization! How to synchronize data from a real-time changing database to Redis/Memcached or Solr/Elasticsearch?

Data synchronization requirements on the Internet

In today’s Internet industry, especially in the distributed and micro-service development environment, NoSQL databases such as Redis and Memcached are widely used to improve search efficiency and accuracy, as well as full-text search services such as Solr and Elasticsearch. So, at this time, there will be a problem we need to think and solve: that is the problem of data synchronization! How to synchronize data from a real-time changing database to Redis/Memcached or Solr/Elasticsearch?

For example, if we are constantly writing data to a database in a distributed environment, we may need to read the data from Redis, Memcached, or Elasticsearch, Solr, etc. Then, the real-time synchronization of database and data in each service has become an urgent problem for us to solve.

Imagine introducing services like Redis, Memcached, or Elasticsearch, or Solr for business reasons. This makes it possible for our application to read data from different services, as shown in the figure below.

Essentially, no matter what services or middleware we introduce, the data will eventually be read from our MySQL database. So, how to synchronize data from MySQL to other services or middleware in real time?

Note: To better illustrate the problem, the following sections use the example of synchronizing data from the MySQL database to the Solr index library.

Data synchronization solution

1. Synchronize in the service code

After adding, modifying, and deleting, execute the logical code that manipulates the Solr index library. For example, the following code snippet.

public ResponseResult updateStatus(Long[] ids, String status){
    try{
        goodsService.updateStatus(ids, status);
        if("status_success".equals(status)){
            List<TbItem> itemList = goodsService.getItemList(ids, status);
            itemSearchService.importList(itemList);
            return new ResponseResult(true."Modified status succeeded")}}catch(Exception e){
        return new ResponseResult(false."Failed to modify state"); }}Copy the code

Advantages:

Easy to operate.

Disadvantages:

The service coupling is high.

The execution efficiency becomes low.

2. Synchronize scheduled tasks

After adding, modifying, and deleting data from a database, you can periodically synchronize data from the database to the Solr index library by performing scheduled tasks.

Timed task technologies include SpringTask, Quartz.

Lol, there is my open source mykit-delay framework, open source address: github.com/sunshinelyz…

One technique to note when performing scheduled tasks here is: Regular tasks for the first time, from the MySQL database query time fields are arranged in reverse chronological order corresponding to the data, and record the current maximum of querying data of time field, every time after time tasks query data, as long as according to time order to query the data in the table of time field is greater than the last record of the time value of data, You only need to record the maximum value of the time field queried in this task, so that you do not need to query all data in the data table again.

Note: The time field here refers to the time field that identifies the data update. That is, when synchronizing data with a scheduled task, it is better to add a time field to the data table to avoid the full table scan every time the task is executed.

Advantages:

Synchronizing the Solr index library is completely decoupled from the business code.

Disadvantages:

The real time of data is not high.

3. Synchronize data using MQ

After performing add, modify, or delete operations in the database, a message is sent to MQ, at which point the synchronizer, acting as a consumer in MQ, retrits the message from the message queue and performs the logic to synchronize the Solr index library.

We can use the following figure to briefly illustrate the process of synchronizing data over MQ.

We can do this using the following code.

public ResponseResult updateStatus(Long[] ids, String status){
    try{
        goodsService.updateStatus(ids, status);
        if("status_success".equals(status)){
            List<TbItem> itemList = goodsService.getItemList(ids, status);
            final String jsonString = JSON.toJSONString(itemList);
            jmsTemplate.send(queueSolr, new MessageCreator(){
                @Override
                public Message createMessage(Session session) throws JMSException{
                    returnsession.createTextMessage(jsonString); }}); }return new ResponseResult(true."Modified status succeeded");
    }catch(Exception e){
        return new ResponseResult(false."Failed to modify state"); }}Copy the code

Advantages:

Business code is decoupled and can be quasi-real-time.

Disadvantages:

You need to include code to send messages to MQ in your business code, and data call interface coupling.

4. Real-time synchronization is achieved through Canal

Canal is an open source database log incremental parsing component of Alibaba. Canal parses database log information to detect changes in table structure and data in the database, so as to update Solr index library.

Using Canal, the business code can be completely decoupled, the API can be completely decoupled, and the quasi-real-time can be achieved.

Introduction of Canal

Alibaba MySQL database Binlog incremental subscription and consumption component, based on database incremental log parsing, provides incremental data subscription and consumption, currently mainly supports MySQL.

Canal open source address: github.com/alibaba/can…

Working principle of Canal

MySQL master slave replication implementation

As can be seen from the figure above, master-slave replication is divided into three main steps:

  • The Master node logs changes to the data in the binary log. These records are called binary log events and can be viewed by running show Binlog Events.
  • The Slave node copies the binary log events of the Master node to its relay log.
  • The Slave node rewrites events in the relay log to reflect changes in its own database.

Internal principle of Canal

First, take a look at Canal’s schematic, as shown below.

The principle is roughly described as follows:

  • Canal emulated the interaction protocol of the MySQL slave, disguised itself as the MySQL slave, and sent the dump protocol to the MySQL Master
  • MySQL Master receives dump request and starts pushing binary log to Slave
  • Canal parses binary log objects (originally byte streams)

Internal structure of Canal

The instructions are as follows:

  • Server: represents a Canal running instance, corresponding to a JVM process.
  • Instance: corresponds to a data queue (one Server corresponds to one or more instances).

Next, let’s look at the submodule under Instance, as shown below.

  • EventParser: Data source access, simulates the interaction between the Slave and Master node, and parses the protocol.
  • EventSink: Connector of EventParser and EventStore. Filters, processes, merges, and distributes data.
  • EventSore: Data storage.
  • MetaManager: Incremental subscription and consumption information management.

Canal Environment Preparation

Set remote access to MySQL

grant all privileges on *.* to 'root'@The '%' identified by '123456';
flush privileges;
Copy the code

MySQL configuration

Note: MySQL is described here based on version 5.7.

The principle of Canal is based on MySQL binlog technology. Therefore, if you want to use Canal, you need to enable MySQL binlog writing function. It is recommended to set the binlog mode to ROW.

To view the binlog mode, run the following command on the MySQL command line:

SHOW VARIABLES LIKE 'binlog_format';
Copy the code

The following shows the execution effect.

As you can see, the default binlog format in MySQL is STATEMENT. We need to change STATEMENT to ROW. Modify the /etc/my.cnf file.

vim /etc/my.cnf
Copy the code

Add the following three configurations under [mysqld].

log-bin=mysql-bin  # enable MySQL binary log
binlog_format=ROW Set the binary log format to ROW
server_id=1 #server_id must be unique and cannot be the same as slaveId for Canal
Copy the code

After modifying the my. CNF file, restart the MySQL service.

service mysqld restart
Copy the code

Next, let’s look at the binlog mode again.

SHOW VARIABLES LIKE 'binlog_format';
Copy the code

As you can see, MySQL’s binlog mode has been set to ROW.

MySQL creates user authorization

The principle of Canal is that the mode itself is MySQL Slave, so you must set the related permissions of MySQL Slave. In this case, you need to create a master/slave synchronization account and grant related permissions to this account.

CREATE USER canal@'localhost' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'localhost';
FLUSH PRIVILEGES;
Copy the code

Canal deployment and installation

Download the Canal

Here, we use Canal 1.1.1 to illustrate, you can go to github.com/alibaba/can… Download the Canal 1.1.1 version.

Upload decompression

Upload the downloaded Canal installation package to the server, and run the following command to decompress the package

mkdir -p /usr/local/canal tar -zxvf ccanal. Deployer-1.1.1.tar. gz -c /usr/local/canal/
Copy the code

The decompressed directory is as follows.

The description of each directory is as follows:

  • Bin: stores executable scripts.
  • Conf: stores configuration files.
  • Lib: Store other dependent or third-party libraries.
  • Logs: Stores log files.

Modifying a Configuration File

In the conf directory of Canal there is a canal.properties file, which is configured for the Canal Server. In this file there is the following configuration line.

canal.destinations=example
Copy the code

Example is equivalent to Canal’s Instance. Multiple instances can be configured here, separated by commas. Example also corresponds to a folder in Canal’s conf directory. That is, each Instance in Canal corresponds to a subdirectory under the CONF directory.

Next, we need to modify a configuration file instance.properties in the example directory under Canal’s conf directory.

vim instance.properties
Copy the code

Modify the following configuration items.

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
## canal slaveId
canal.instance.mysql.slaveId = 1234

#position info, need to change your own database informationCanal. The instance. The master. The address = 127.0.0.1:3306 canal. The instance. The master. The journal. The name = canal. The instance. The master. The position = canal.instance.master.timestamp =#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =

#username/password
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =canaldb
canal.instance.connectionCharset = UTF-8

#table regexcanal.instance.filter.regex = canaldb\\.. *# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
Copy the code

Meaning:

  • Canal. The instance. The mysql. SlaveId: serverId concept of mysql cluster configuration, need to make sure that and the current mysql cluster id only.
  • Canal. The instance. The master. Address: mysql main library link address;
  • Canal. Instance. DbUsername: mysql database account;
  • Canal. Instance. DbPassword: mysql database password;
  • Canal. Instance. DefaultDatabaseName: mysql link the default database;
  • Canal. Instance. ConnectionCharset: mysql data parsing code;
  • Canal. The instance. The filter. The regex: mysql data analytical focus on table, Perl regular expressions.

Start the Canal

After configuring Canal, you can start Canal. Go to the bin directory on Canal and run the following command to start Canal.

./startup.sh
Copy the code

Test the Canal

Import and modify the source code

Here, we use Canal’s source code for testing. After downloading Canal’s source code, we import it into IDEA.

Next, we find the SimpleCanalClientTest class under Example to test. The source code for this class is shown below.

package com.alibaba.otter.canal.example;

import java.net.InetSocketAddress;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;

/** * Test examples of single-machine mode **@authorJianghang 2013-4-15 14:47:20 *@version1.0.4 * /
public class SimpleCanalClientTest extends AbstractCanalClientTest {

    public SimpleCanalClientTest(String destination){
           super(destination);
     }

    public static void main(String args[]) {
        // Create a link based on the IP address without the HA function
        String destination = "example";
        String ip = AddressUtils.getHostIp();
        CanalConnector connector = CanalConnectors.newSingleConnector(
            new InetSocketAddress(ip, 11111),
                destination,
                "canal"."canal");

        final SimpleCanalClientTest clientTest = new SimpleCanalClientTest(destination);
        clientTest.setConnector(connector);
        clientTest.start();
        Runtime.getRuntime().addShutdownHook(new Thread() {

            public void run(a) {
                try {
                    logger.info("## stop the canal client");
                    clientTest.stop();
                } catch (Throwable e) {
                    logger.warn("##something goes wrong when stopping canal:", e);
                } finally {
                    logger.info("## canal client is down."); }}}); }}Copy the code

As you can see, the destination used in this class is example. In this class, we just need to change the IP address to the Canal Server IP.

The following line of code is displayed.

String ip = AddressUtils.getHostIp();
Copy the code

Is amended as:

String ip = "192.168.175.100"
Copy the code

Since we did not specify a user name and password when we configured Canal, we also need to add the following code.

CanalConnector connector = CanalConnectors.newSingleConnector(
    new InetSocketAddress(ip, 11111),
    destination,
    "canal"."canal");
Copy the code

Is amended as:

CanalConnector connector = CanalConnectors.newSingleConnector(
    new InetSocketAddress(ip, 11111),
    destination,
    ""."");
Copy the code

When the modification is complete, run the main method to start the program.

Test data changes

Next, create a CanalDB database in MySQL.

create database canaldb;
Copy the code

In this case, the related log information is displayed on the COMMAND line of IDEA.

****************************************************
* Batch Id: [7] ,count : [3] , memsize : [149] , Time : 2020-08-05 23:25:35
* Start : [mysql-bin.000007:6180:1540286735000(2020-08-05 23:25:35)] 
* End : [mysql-bin.000007:6356:1540286735000(2020-08-05 23:25:35)] 
****************************************************
Copy the code

Next, I create a data table in the CanalDB database, and add, delete, change and check the data in the data table. The log information output by the program is as follows.

Mysql/bin/bin/bin/bin/bin/bin
****************************************************
* Batch Id: [7] ,count : [3] , memsize : [149] , Time : 2020-08-05 23:25:35
* Start : [mysql-bin.000007:6180:1540286735000(2020-08-05 23:25:35)] 
* End : [mysql-bin.000007:6356:1540286735000(2020-08-05 23:25:35)] 
****************************************************

================> binlog[mysql-bin.000007:6180] , executeTime : 1540286735000(2020-08-05 23:25:35) , gtid : () , delay : 393ms
 BEGIN ----> Thread id: 43
----------------> binlog[mysql-bin.000007:6311] , name[canal,canal_table] , eventType : DELETE , executeTime : 1540286735000(2020-08-05 23:25:35) , gtid : () , delay : 393 ms
id : 8    type=int(10) unsigned
name : 512    type=varchar(255)
----------------
 END ----> transaction id: 249
================> binlog[mysql-bin.000007:6356] , executeTime : 1540286735000(2020-08-05 23:25:35) , gtid : () , delay : 394ms

****************************************************
* Batch Id: [8] ,count : [3] , memsize : [149] , Time : 2020-08-05 23:25:35
* Start : [mysql-bin.000007:6387:1540286869000(2020-08-05 23:25:49)] 
* End : [mysql-bin.000007:6563:1540286869000(2020-08-05 23:25:49)] 
****************************************************

================> binlog[mysql-bin.000007:6387] , executeTime : 1540286869000(2020-08-05 23:25:49) , gtid : () , delay : 976ms
 BEGIN ----> Thread id: 43
----------------> binlog[mysql-bin.000007:6518] , name[canal,canal_table] , eventType : INSERT , executeTime : 1540286869000(2020-08-05 23:25:49) , gtid : () , delay : 976 ms
id : 21    type=int(10) unsigned    update=true
name : aaa    type=varchar(255)    update=true
----------------
 END ----> transaction id: 250
================> binlog[mysql-bin.000007:6563] , executeTime : 1540286869000(2020-08-05 23:25:49) , gtid : () , delay : 977ms

****************************************************
* Batch Id: [9] ,count : [3] , memsize : [161] , Time : 2020-08-05 23:26:22
* Start : [mysql-bin.000007:6594:1540286902000(2020-08-05 23:26:22)] 
* End : [mysql-bin.000007:6782:1540286902000(2020-08-05 23:26:22)] 
****************************************************

================> binlog[mysql-bin.000007:6594] , executeTime : 1540286902000(2020-08-05 23:26:22) , gtid : () , delay : 712ms
 BEGIN ----> Thread id: 43
----------------> binlog[mysql-bin.000007:6725] , name[canal,canal_table] , eventType : UPDATE , executeTime : 1540286902000(2020-08-05 23:26:22) , gtid : () , delay : 712 ms
id : 21    type=int(10) unsigned
name : aaac    type=varchar(255)    update=true
----------------
 END ----> transaction id: 252
================> binlog[mysql-bin.000007:6782] , executeTime : 1540286902000(2020-08-05 23:26:22) , gtid : () , delay : 713ms
Copy the code

Data synchronization implementation

demand

The changes of database data are updated to solr’s index library in real time through canal parsing binlog.

The specific implementation

Create a project

Create Maven project mykit-canal-demo and add the following configuration to the pop. XML file.

<dependencies>
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.0.24</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.protocol</artifactId>
        <version>1.0.24</version>
    </dependency>
    <dependency>
        <groupId>commons-lang</groupId>
        <artifactId>commons-lang</artifactId>
        <version>2.6</version>
    </dependency>
    <dependency>
        <groupId>org.codehaus.jackson</groupId>
        <artifactId>jackson-mapper-asl</artifactId>
        <version>1.8.9</version>
    </dependency>

    <dependency>
        <groupId>org.apache.solr</groupId>
        <artifactId>solr-solrj</artifactId>
        <version>4.10.3</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.9</version>
        <scope>test</scope>
    </dependency>

</dependencies>
Copy the code

Create the log4j configuration file XML

Create the log4j.properties file in the SRC /main/ Resources directory of the project, as shown below.

log4j.rootCategory=debug, CONSOLE

# CONSOLE is set to be a ConsoleAppender using a PatternLayout.log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout Log4j. Appender. CONSOLE. Layout. ConversionPattern = % d {ISO8601} [% 15.15 t] % % - 6 r c % x - 30.30-5 p % % m \ n# LOGFILE is set to be a File appender using a PatternLayout.
# log4j.appender.LOGFILE=org.apache.log4j.FileAppender
# log4j.appender.LOGFILE.File=d:\axis.log
# log4j.appender.LOGFILE.Append=true
# log4j.appender.LOGFILE.layout=org.apache.log4j.PatternLayout
. # log4j appenders. LOGFILE. Layout. ConversionPattern = % d {ISO8601} [% 15.15 t] % % - 6 r c % x - 30.30-5 p % % m \ n
Copy the code

Creating an entity Class

Create a Book entity class under the io.mykit.canal.demo.bean package to test canal’s data transfer, as shown below.

package io.mykit.canal.demo.bean;
import org.apache.solr.client.solrj.beans.Field;
import java.util.Date;
public class Book implements Serializable {
    private static final long serialVersionUID = -6350345408771427834L; {@Field("id")
    private Integer id;

    @Field("book_name")
    private String name;

    @Field("book_author")
    private String author;

    @Field("book_publishtime")
    private Date publishtime;

    @Field("book_price")
    private Double price;

    @Field("book_publishgroup")
    private String publishgroup;

    public Integer getId(a) {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName(a) {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getAuthor(a) {
        return author;
    }

    public void setAuthor(String author) {
        this.author = author;
    }

    public Date getPublishtime(a) {
        return publishtime;
    }

    public void setPublishtime(Date publishtime) {
        this.publishtime = publishtime;
    }

    public Double getPrice(a) {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }

    public String getPublishgroup(a) {
        return publishgroup;
    }

    public void setPublishgroup(String publishgroup) {
        this.publishgroup = publishgroup;
    }

    @Override
    public String toString(a) {
        return "Book{" +
                "id=" + id +
                ", name='" + name + '\' ' +
                ", author='" + author + '\' ' +
                ", publishtime=" + publishtime +
                ", price=" + price +
                ", publishgroup='" + publishgroup + '\' ' +
                '} '; }}Copy the code

In the Book entity class, we use Solr’s annotation @field to define the relationship between the entity-class Field and the Solr Field.

Implementation of various utility classes

Next, we’re in IO. Mykit. Canal. Demo. Utils package created under a variety of tools.

  • BinlogValue

To store the value values for each row and column of binlog analysis, the code is shown below.

package io.mykit.canal.demo.utils;
import java.io.Serializable;
/** ** ClassName: BinlogValue 

** The value of each row and column analyzed by binlog;

* New data: beforeValue and value are both existing values;

* Modify data: beforeValue is the value before modification; Value is the modified value.

* Delete data: beforeValue and value are both the values before deletion;

*/

*/
public class BinlogValue implements Serializable { private static final long serialVersionUID = -6350345408773943086L; private String value; private String beforeValue; /** * binlog parses the value of each row and column;

* New data: value: existing value;

* Modify data: value is the modified value;

* Delete data: value is the value before deletion;

*/

*/
public String getValue(a) { return value; } public void setValue(String value) { this.value = value; } /** * binlog parses beforeValue values for each row and column;

* New data: beforeValue is an existing value;

* Modify data: beforeValue is the value before modification;

* Delete data: beforeValue is the value before deletion;

*/
public String getBeforeValue(a) { return beforeValue; } public void setBeforeValue(String beforeValue) { this.beforeValue = beforeValue; }}Copy the code
  • CanalDataParser

To parse the data, the code is shown below.

package io.mykit.canal.demo.utils;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.CanalEntry.TransactionBegin;
import com.alibaba.otter.canal.protocol.CanalEntry.TransactionEnd;
import com.google.protobuf.InvalidProtocolBufferException;

/** * parse data */
public class CanalDataParser {
    
    protected static final String DATE_FORMAT   = "yyyy-MM-dd HH:mm:ss";
    protected static final String yyyyMMddHHmmss = "yyyyMMddHHmmss";
    protected static final String yyyyMMdd      = "yyyyMMdd";
    protected static final String SEP           = SystemUtils.LINE_SEPARATOR;
    protected static String  context_format     = null;
    protected static String  row_format         = null;
    protected static String  transaction_format = null;
    protected static String row_log = null;
    
    private static Logger logger = LoggerFactory.getLogger(CanalDataParser.class);
    
    static {
        context_format = SEP + "* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *" + SEP;
        context_format += "* Batch Id: [{}] ,count : [{}] , memsize : [{}] , Time : {}" + SEP;
        context_format += "* Start : [{}] " + SEP;
        context_format += "* End : [{}] " + SEP;
        context_format += "* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *" + SEP;

        row_format = SEP
                     + "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {} , delay : {}ms"
                     + SEP;

        transaction_format = SEP + "================> binlog[{}:{}] , executeTime : {} , delay : {}ms" + SEP;

        row_log = "schema[{}], table[{}]";
    }

    public static List<InnerBinlogEntry> convertToInnerBinlogEntry(Message message) {
        List<InnerBinlogEntry> innerBinlogEntryList = new ArrayList<InnerBinlogEntry>();
        
        if(message == null) {
            logger.info("Received an empty message; Ignore");
            return innerBinlogEntryList;
        }
        
        long batchId = message.getId();
        int size = message.getEntries().size();
        if (batchId == -1 || size == 0) {
            logger.info("Received empty message[size=" + size + "]; Ignore");
            return innerBinlogEntryList;
        }

        printLog(message, batchId, size);
        List<Entry> entrys = message.getEntries();

        // Output logs
        for (Entry entry : entrys) {
            long executeTime = entry.getHeader().getExecuteTime();
            long delayTime = new Date().getTime() - executeTime;
            
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) {
                    TransactionBegin begin = null;
                    try {
                        begin = TransactionBegin.parseFrom(entry.getStoreValue());
                    } catch (InvalidProtocolBufferException e) {
                        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                    }
                    // Prints the transaction header information, the id of the thread executing, and the transaction time
                    logger.info("BEGIN ----> Thread id: {}",  begin.getThreadId());
                    logger.info(transaction_format, new Object[] {entry.getHeader().getLogfileName(),
                                String.valueOf(entry.getHeader().getLogfileOffset()), String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) });

                } else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
                    TransactionEnd end = null;
                    try {
                        end = TransactionEnd.parseFrom(entry.getStoreValue());
                    } catch (InvalidProtocolBufferException e) {
                        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                    }
                    // Prints transaction commit information, transaction ID
                    logger.info("END ----> transaction id: {}", end.getTransactionId());
                    logger.info(transaction_format,
                        new Object[] {entry.getHeader().getLogfileName(),  String.valueOf(entry.getHeader().getLogfileOffset()),
                                String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) });
                }
                continue;
            }

            // Parse the result
            if (entry.getEntryType() == EntryType.ROWDATA) {
                RowChange rowChage = null;
                try {
                    rowChage = RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                }

                EventType eventType = rowChage.getEventType();

                logger.info(row_format, new Object[] { entry.getHeader().getLogfileName(),
                            String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
                            entry.getHeader().getTableName(), eventType, String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) });

                // Assemble the data result
                if (eventType == EventType.INSERT || eventType == EventType.DELETE || eventType == EventType.UPDATE) {
                    String schemaName = entry.getHeader().getSchemaName();
                    String tableName = entry.getHeader().getTableName();
                    List<Map<String, BinlogValue>> rows = parseEntry(entry);

                    InnerBinlogEntry innerBinlogEntry = new InnerBinlogEntry();
                    innerBinlogEntry.setEntry(entry);
                    innerBinlogEntry.setEventType(eventType);
                    innerBinlogEntry.setSchemaName(schemaName);
                    innerBinlogEntry.setTableName(tableName.toLowerCase());
                    innerBinlogEntry.setRows(rows);

                    innerBinlogEntryList.add(innerBinlogEntry);
                } else {
                    logger.info(SQL > INSERT INSERT UPDATE + eventType.toString() + "]");
                }
                continue; }}return innerBinlogEntryList;
    }

    private static List<Map<String, BinlogValue>> parseEntry(Entry entry) {
        List<Map<String, BinlogValue>> rows = new ArrayList<Map<String, BinlogValue>>();
        try {
            String schemaName = entry.getHeader().getSchemaName();
            String tableName = entry.getHeader().getTableName();
            RowChange rowChage = RowChange.parseFrom(entry.getStoreValue());
            EventType eventType = rowChage.getEventType();

            // Process each row of data in each Entry
            for (RowData rowData : rowChage.getRowDatasList()) {
                StringBuilder rowlog = new StringBuilder("rowlog schema[" + schemaName + "], table[" + tableName + "], event[" + eventType.toString() + "]");
                
                Map<String, BinlogValue> row = new HashMap<String, BinlogValue>();
                List<Column> beforeColumns = rowData.getBeforeColumnsList();
                List<Column> afterColumns = rowData.getAfterColumnsList();
                beforeColumns = rowData.getBeforeColumnsList();
                if (eventType == EventType.DELETE) {//delete
                    for(Column column : beforeColumns) {
                        BinlogValue binlogValue = newBinlogValue(); binlogValue.setValue(column.getValue()); binlogValue.setBeforeValue(column.getValue()); row.put(column.getName(), binlogValue); }}else if(eventType == EventType.UPDATE) {//update
                    for(Column column : beforeColumns) {
                        BinlogValue binlogValue = new BinlogValue();
                        binlogValue.setBeforeValue(column.getValue());
                        row.put(column.getName(), binlogValue);
                    }
                    for(Column column : afterColumns) {
                        BinlogValue binlogValue = row.get(column.getName());
                        if(binlogValue == null) {
                            binlogValue = newBinlogValue(); } binlogValue.setValue(column.getValue()); row.put(column.getName(), binlogValue); }}else { // insert
                    for(Column column : afterColumns) {
                        BinlogValue binlogValue = new BinlogValue();
                        binlogValue.setValue(column.getValue());
                        binlogValue.setBeforeValue(column.getValue());
                        row.put(column.getName(), binlogValue);
                    }
                } 
               
                rows.add(row);
                String rowjson = JacksonUtil.obj2str(row);
                
                logger.info("#################################### Data Parse Result ####################################");
                logger.info(rowlog + "," + rowjson);
                logger.info("#################################### Data Parse Result ####################################");
                logger.info(""); }}catch (InvalidProtocolBufferException e) {
            throw new RuntimeException("parseEntry has an error , data:" + entry.toString(), e);
        }
        return rows;
    }

    private static void printLog(Message message, long batchId, int size) {
        long memsize = 0;
        for (Entry entry : message.getEntries()) {
            memsize += entry.getHeader().getEventLength();
        }

        String startPosition = null;
        String endPosition = null;
        if(! CollectionUtils.isEmpty(message.getEntries())) { startPosition = buildPositionForDump(message.getEntries().get(0));
            endPosition = buildPositionForDump(message.getEntries().get(message.getEntries().size() - 1));
        }

        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
        logger.info(context_format, new Object[] {batchId, size, memsize, format.format(new Date()), startPosition, endPosition });
    }

    private static String buildPositionForDump(Entry entry) {
        long time = entry.getHeader().getExecuteTime();
        Date date = new Date(time);
        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
        return entry.getHeader().getLogfileName() + ":" + entry.getHeader().getLogfileOffset() + ":" + entry.getHeader().getExecuteTime() + "(" + format.format(date) + ")"; }}Copy the code
  • DateUtils

Time utility class, code shown below.

package io.mykit.canal.demo.utils;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

public class DateUtils {
    
    private static final String FORMAT_PATTERN = "yyyy-MM-dd HH:mm:ss";
    
    private static SimpleDateFormat sdf = new SimpleDateFormat(FORMAT_PATTERN);
    
    public static Date parseDate(String datetime) throws ParseException{
        if(datetime ! =null&&!"".equals(datetime)){
            return sdf.parse(datetime);
        }
        return null;
    }
    
    
    public static String formatDate(Date datetime) throws ParseException{
        if(datetime ! =null) {return sdf.format(datetime);
        }
        return null;
    }
    
    public static Long formatStringDateToLong(String datetime) throws ParseException{
        if(datetime ! =null&&!"".equals(datetime)){
            Date d =  sdf.parse(datetime);
            return d.getTime();
        }
        return null;
    }
    
    public static Long formatDateToLong(Date datetime) throws ParseException{
        if(datetime ! =null) {return datetime.getTime();
        }
        return null; }}Copy the code
  • InnerBinlogEntry

Binlog entity class, as shown below.

package io.mykit.canal.demo.utils;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;

public class InnerBinlogEntry {
    
    /** * Canal native Entry */
    private Entry entry;
    
    /** * The name of the table to which this Entry belongs */
    private String tableName;
    
    /** * The Entry belongs to the database name */
    private String schemaName;
    
    /** * This Entry is the operation type of this Entry, which corresponds to canal's native enumeration. EventType.INSERT; EventType.UPDATE; EventType.DELETE; * /
    private EventType eventType;
    
    private List<Map<String, BinlogValue>> rows = new ArrayList<Map<String, BinlogValue>>();
    
    
    public Entry getEntry(a) {
        return entry;
    }
    public void setEntry(Entry entry) {
        this.entry = entry;
    }
    public String getTableName(a) {
        return tableName;
    }
    public void setTableName(String tableName) {
        this.tableName = tableName;
    }
    public EventType getEventType(a) {
        return eventType;
    }
    public void setEventType(EventType eventType) {
        this.eventType = eventType;
    }
    public String getSchemaName(a) {
        return schemaName;
    }
    public void setSchemaName(String schemaName) {
        this.schemaName = schemaName;
    }
    public List<Map<String, BinlogValue>> getRows() {
        return rows;
    }
    public void setRows(List<Map<String, BinlogValue>> rows) {
        this.rows = rows; }}Copy the code
  • JacksonUtil

Json utility class, code shown below.

package io.mykit.canal.demo.utils;

import java.io.IOException;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;


public class JacksonUtil {
    private static ObjectMapper mapper = new ObjectMapper();

    public static String obj2str(Object obj) {
        String json = null;
        try {
            json = mapper.writeValueAsString(obj);
        } catch (JsonGenerationException e) {
            e.printStackTrace();
        } catch (JsonMappingException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return json;
    }

    public static <T> T str2obj(String content, Class<T> valueType) {
        try {
            return mapper.readValue(content, valueType);
        } catch (JsonParseException e) {
            e.printStackTrace();
        } catch (JsonMappingException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null; }}Copy the code

Implementation of synchronous program

With the entity and utility classes in place, we can write a synchronization program to implement real-time synchronization of MySQL database data to Solr index library. MykitCanalDemoSync class is common in the package IO. Mykit.canal.demo. main, as shown below.

package io.mykit.canal.demo.main;

import io.mykit.canal.demo.bean.Book;
import io.mykit.canal.demo.utils.BinlogValue;
import io.mykit.canal.demo.utils.CanalDataParser;
import io.mykit.canal.demo.utils.DateUtils;
import io.mykit.canal.demo.utils.InnerBinlogEntry;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.text.ParseException;
import java.util.List;
import java.util.Map;

public class SyncDataBootStart {

    private static Logger logger = LoggerFactory.getLogger(SyncDataBootStart.class);

    public static void main(String[] args) throws Exception {

        String hostname = "192.168.175.100";
        Integer port = 11111;
        String destination = "example";

        // Get the CanalServer connection
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(hostname, port), destination, ""."");

        / / CanalServer connection
        canalConnector.connect();

        / / subscribe to the Destination
        canalConnector.subscribe();

        // Polling pull data
        Integer batchSize = 5*1024;
        while (true){
            Message message = canalConnector.getWithoutAck(batchSize);

            long messageId = message.getId();
            int size = message.getEntries().size();

            if(messageId == -1 || size == 0) {try {
                    Thread.sleep(1000);
                } catch(InterruptedException e) { e.printStackTrace(); }}else{
                // Perform data synchronization
                //1. Parse Message objects
                List<InnerBinlogEntry> innerBinlogEntries = CanalDataParser.convertToInnerBinlogEntry(message);

                //2. Synchronize parsed data to Solr's index library.
                syncDataToSolr(innerBinlogEntries);
            }

            // Submit confirmationcanalConnector.ack(messageId); }}private static void syncDataToSolr(List<InnerBinlogEntry> innerBinlogEntries) throws Exception {
        // Get the connection to solr
        SolrServer solrServer = new HttpSolrServer("http://192.168.175.101:8080/solr");

        // Add, modify, delete operations according to the data in the data set.
        if(innerBinlogEntries ! =null) {for (InnerBinlogEntry innerBinlogEntry : innerBinlogEntries) {

                CanalEntry.EventType eventType = innerBinlogEntry.getEventType();

                // If it is Insert, update, then need to synchronize data to solr index library
                if(eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.UPDATE){
                    List<Map<String, BinlogValue>> rows = innerBinlogEntry.getRows();
                    if(rows ! =null) {for (Map<String, BinlogValue> row : rows) {
                            BinlogValue id = row.get("id");
                            BinlogValue name = row.get("name");
                            BinlogValue author = row.get("author");
                            BinlogValue publishtime = row.get("publishtime");
                            BinlogValue price = row.get("price");
                            BinlogValue publishgroup = row.get("publishgroup");

                            Book book = new Book();
                            book.setId(Integer.parseInt(id.getValue()));
                            book.setName(name.getValue());
                            book.setAuthor(author.getValue());
                            book.setPrice(Double.parseDouble(price.getValue()));
                            book.setPublishgroup(publishgroup.getValue());
                            book.setPublishtime(DateUtils.parseDate(publishtime.getValue()));


                            // Import data into solr index librarysolrServer.addBean(book); solrServer.commit(); }}}else if(eventType == CanalEntry.EventType.DELETE){
                    // If it is a Delete operation, the data in solr index library needs to be deleted.
                    List<Map<String, BinlogValue>> rows = innerBinlogEntry.getRows();
                    if(rows ! =null) {for (Map<String, BinlogValue> row : rows) {
                            BinlogValue id = row.get("id");

                            // Delete solr's index library by ID
                            solrServer.deleteById(id.getValue());
                            solrServer.commit();
                        }
                    }

                }
            }
        }
    }
}
Copy the code

Next, start the main method of the SyncDataBootStart class and listen for Canal Server, which listens for MySQL binlog changes. SyncDataBootStart will immediately receive the change information and parse it into a Book object to update in real time to the Solr library. If data is deleted in the MySQL database, data in the Solr library is also deleted in real time.

Partial reference to Canal official documentation: github.com/alibaba/can… .

Ok, that’s enough for today. I’m Glacier. See you next time