This is the first day of my participation in the August Challenge. For details, see:August is more challenging


Recently, in a business scenario, we need to pay attention to the data of some tables in a business system database. When the data is added or modified, it is synchronized to the tables in another business system database.

Binlog-based master-slave replication is the first thing that comes to mind when it comes to database synchronization. However, in our scenario, there are a few problems:

  • First, it is not necessary to copy all tables, the copy object only has a few tables
  • Second, and more troubling, the database table structures of the two business systems may not be consistent. For example, if you want to synchronize certain fields in table A of database 1 to table B of database 2, the fields in table A and table B are not exactly the same during this process

In this case, we have to code to get the changes in the table in database 1, and then manually map them into the table in database 2. However, the process of obtaining the change data is still dependent on the binlog, so we need to monitor the binlog in the code.

To conclude, we ended up using an open source tool, mysql-Binlog-connector-Java, to monitor the binlog changes and retrieve the data, which was then manually inserted into a table in another library to synchronize the tables. Git address for the project:

https://github.com/shyiko/mysql-binlog-connector-java
Copy the code

Binlog is a binary file, which is stored in disk. It is used to record the database table structure change, the binary log of table data modification. In fact, in addition to data replication, it can also achieve data recovery, incremental backup and other functions.

Before starting the project, make sure the mysql service has binlog enabled:

show variables like 'log_bin';
Copy the code

If the value is OFF, it indicates that binlog is not enabled, then you need to enable binlog first and modify the configuration file:

log_bin=mysql-bin
binlog-format=ROW
server-id=1
Copy the code

A brief description of the parameters:

  • Added to the configuration filelog_binAfter the configuration item is configured, the function is enabledbinlog
  • binlog-formatisbinlogThe log format can be:STATEMENT,ROW,MIXEDWe use it hereROWmodel
  • server-idUsed to identify which SQL statement is fromserverWe have to set it here, otherwise we won’t be able to listen to the event properly in the later code

Restart the mysql service after changing the configuration file. Check whether binlog is enabled again. If the value is ON, it indicates that binlog is enabled successfully.

In a Java project, first introduce maven coordinates:

<dependency>
    <groupId>com.github.shyiko</groupId>
    <artifactId>mysql-binlog-connector-java</artifactId>
    <version>0.21.0</version>
</dependency>
Copy the code

Write a simple example to see how it works:

public static void main(String[] args) {
    BinaryLogClient client = new BinaryLogClient("127.0.0.1".3306."hydra"."123456");
    client.setServerId(2);

    client.registerEventListener(event -> {
        EventData data = event.getData();
        if (data instanceof TableMapEventData) {
            System.out.println("Table:");
            TableMapEventData tableMapEventData = (TableMapEventData) data;
            System.out.println(tableMapEventData.getTableId()+": ["+tableMapEventData.getDatabase() + "-" + tableMapEventData.getTable()+"]");
        }
        if (data instanceof UpdateRowsEventData) {
            System.out.println("Update:");
            System.out.println(data.toString());
        } else if (data instanceof WriteRowsEventData) {
            System.out.println("Insert:");
            System.out.println(data.toString());
        } else if (data instanceof DeleteRowsEventData) {
            System.out.println("Delete:"); System.out.println(data.toString()); }});try {
        client.connect();
    } catch(IOException e) { e.printStackTrace(); }}Copy the code

First, create a BinaryLogClient client object, which is initialized by passing mysql connection information. After creating a BinaryLogClient object, register a listener for the client to listen to and parse the binlog. In the listener, we have only four types of event data being processed for the time being, except for WriteRowsEventData, DeleteRowsEventData, and UpdateRowsEventData for add, delete, and change operation types. There is also data of type TableMapEventData that contains table mappings, which will be explained in later examples.

In this case, the client listens to all events at the database level, and can listen to DML and DDL statements of the table, so we only need to process the event data we care about, otherwise we will receive a lot of redundant data.

Starting the program, the console output:

Com. Making. Shyiko. Mysql. Binlog. BinaryLogClient openChannelToBinaryLogStream information: Connected to 127.0.0.1:3306 at mysql-bin.000002/1046 (SID :2, CID :10) Connected to 127.0.0.1:3306 at mysql-bin.000002/1046 (SID :2, CID :10)Copy the code

Mysql > insert tenant (name), dept (name), tenant (name), dept (name)

insert into dept VALUES(8, "human", ""," 1 ");Copy the code

At this point, the console will print the data for the event it is listening for:

Table:
108: [tenant-dept]
Insert:
WriteRowsEventData{tableId=108, includedColumns={0, 1, 2, 3}, rows=[
    [8, 人力, , 1]
]}
Copy the code

There are two types of event type data that we listen to. The first type is TableMapEventData, which obtains the database name, table name, and table ID of the operation. The reason why we want to listen on this event is that the data returned by listening on the actual operation contains the table ID, but not the table name. Therefore, if we want to know which table the operation is on, we need to maintain a relationship between the ID and the table first.

The second printed listener event data is WriteRowsEventData, which records the table for which the INSERT statement is used, the columns involved in the insert, and the data actually inserted. In addition, if we only need to process one or several specific tables, we can also set the list of tables in advance. Here, data filtering can be realized according to the mapping relationship between table ID and table name.

Next, we execute an update statement:

update dept set tenant_id=3 where id=8 or id=9
Copy the code

Console output:

Table: 108: [tenant-dept] Update: UpdateRowsEventData{tableId=108, includedColumnsBeforeUpdate={0, 1, 2, 3}, includedColumns={0, 1, 2, 3}, Rows = [{before = 1] [8, human,,, after = [8, manpower, and 3]}, {before = [9, manpower,, 1], after = [9, manpower, and 3]}]}Copy the code

When an UPDATE statement is executed, it may apply to more than one piece of data, so the actual modified data may contain more than one row. This is reflected in the rows above, which contains two pieces of data with ids 8 and 9.

Finally, we execute a delete statement:

delete from dept where tenant_id=3
Copy the code

The console prints the following, and rows also returns two valid pieces of data:

Table: 108: [tenant-dept] Delete: DeleteRowsEventData{tableId=108, includedColumns={0, 1, 2, 3}, rows=[[8, manpower,, 3], [9, manpower,, 3]]} DeleteRowsEventData{tableId=108, includedColumns={0, 1, 2, 3}, rows=[[8, manpower,, 3], [9, manpower,, 3]]}Copy the code

After the introduction of the simple principle of use, we will return to our original requirement, which is to synchronize the new or modified data in one table to another table. There is another problem, that is, how to correspond the returned data to the corresponding column. How do you do that? In the example of the update operation, we are going to do something after the extracted data, changing the method in the above example:

if (data instanceof UpdateRowsEventData) {
    System.out.println("Update:");
    UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) data;
    for(Map.Entry<Serializable[], Serializable[]> row : updateRowsEventData.getRows()) { List<Serializable> entries = Arrays.asList(row.getValue()); System.out.println(entries); JSONObject dataObject = getDataObject(entries); System.out.println(dataObject); }}Copy the code

After casting the data type to UpdateRowsEventData, you can get the updated row data using the getRows method, and you can get the values for each column.

After that, a self-implemented getDataObject method is called to implement the data-to-column binding process:

private static JSONObject getDataObject(List message) {
    JSONObject resultObject = new JSONObject();
    String format = "{\"id\":\"0\",\"dept_name\":\"1\",\"comment\":\"2\",\"tenant_id\":\"3\"}";
    JSONObject json = JSON.parseObject(format);
    for (String key : json.keySet()) {
        resultObject.put(key, message.get(json.getInteger(key)));
    }
    return resultObject;
}
Copy the code

In the format string, a string that maintains the order of the fields of a database table in advance, identifying the position of each field in the order. We can use this function to load the data into the column. We can use an update statement to check the result:

update dept set tenant_id=3,comment="1" where id=8
Copy the code

The output of the console is as follows:

Table: 108: Update: [tenant - dept] [8, manpower, 1, 3] {" tenant_id ": 3," dept_name ":" human ", "comment" : "1", "id" : 8}Copy the code

As you can see, the attributes of the modified record are filled into its corresponding column. After that, according to the specific business logic, we can fetch the data according to the field name and synchronize the data to other tables.

The last

If you think it helps you, friends can like, retweet ~ thank you very much

Wechat search: add a friend on the code, like friends can be oh ~

Reply “interview”, “map”, “structure” and “actual combat” in the background of the public account, and get free information