The first step of big data analysis in the existing architecture is how to change data from relational database to non-relational database. There are many solutions on the Internet, and we also experienced a lot of exploration and practice of three solutions, and finally used Canal. This is an article written by Zhang Tongrui, one of our colleagues in the big data department. I would like to share it with you. If you are interested, you can introduce it further later.

Demand background

In recent years, the concept of microservices continues to be hot, and there are more and more discussions on the network about microservices and individual architectures. Facing the growing business needs, many companies give priority to the microservices when upgrading their technical architectures. My company also chose this direction to upgrade its technical architecture to support more traffic and more convenient business expansion.

Found the problem

Microservices are split in two ways: Service systems are split instead of databases, and service systems are split into libraries. If the data size is small, it is unnecessary to split the database, because the split data will face multidimensional data query, cross-process transactions and other problems. However, with the development of business, the single database instance of my company can no longer meet the business needs, so I choose to split the business system and split the database at the same time, so I also face the above problems. This paper mainly introduces the real-time query solution of multi-dimensional data. The current system architecture and storage structure are as follows:

solution

  • To query multiple database data, you first need to synchronize the databases together to facilitate the query

  • In order to meet the requirements of large data volume, NOSQL database is preferred as the synchronization database

  • NOSQL databases cannot perform associative query, so you need to concatenate relational data and convert it to non-relational data

  • Multi-dimensional service query requires real-time performance, so MongoDB, a database with better real-time performance in NOSQL, should be selected

According to the above ideas, the data integration architecture is summarized as follows:

The solution

There are two types of data synchronization cases: MQ message synchronization and binlog data read synchronization

Let’s start with MQ message synchronization, which my company tried out for a while and found the following problems:

  • Data is carried out around the business and MQ messages are sent for business-critical data operations, which are highly dependent on the business system

  • The stock data in the database needs to be processed separately

  • The tool table also requires separate maintenance synchronization

  • MQ logic needs to be re-added each time a new table is added

Considering the above problems, synchronizing data using MQ is the optimal solution

At present, there are some mature schemes using binlog data reading method, such as the Tungsten Replicator, but these synchronization tools can only achieve 1:1 data replication. It is difficult to add customized logic during the data replication process, and the data collection operation of different databases and different tables is not supported. In summary, the optimal solution is to read the post-binlog and process the subsequent data logic by itself. At present, the most mature solution of binlog reading tool should be Canal of Alibaba open source.

canal

Canal is the incremental subscription & consumption component of Alibaba mysql database Binlog. Ali Cloud DRDS, Alibaba TDDL secondary index, small table replication. It’s all based on Canal, and it’s widely used. Canal’s principle is relatively simple:

  • Canal emulated the interaction protocol of the mysql slave, disguised itself as the mysql slave, and sent the dump protocol to the mysql master

  • Mysql master receives dump request and starts pushing binary log to slave(canal)

  • Canal parses binary log objects (originally byte streams)

The canal is introduced: https://github.com/alibaba/canal/wiki

I use canal HA mode, and zooKeeper elects available instances, one instance for each database, and the server configuration is as follows:

Directory:

conf
    database1
        -instance.properties
    database2
        -instance.properties
    canal.propertiesCopy the code

instance.properties

canal.instance.mysql.slaveId = 1001 canal.instance.master.address = X.X.X.X:3306 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp = canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.defaultDatabaseName = canal.instance.connectionCharset = UTF-8 canal.instance.filter.regex = .*\\.. * canal.instance.filter.black.regex =Copy the code

canal.properties

canal.id= 1
canal.ip=X.X.X.X
canal.port= 11111
canal.zkServers=X.X.X.X:2181,X.X.X.X:2181,X.X.X.X:2181
canal.zookeeper.flush.period = 1000
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
canal.instance.memory.buffer.size = 16384
canal.instance.memory.buffer.memunit = 1024 ...
canal.instance.global.lazy = false
canal.instance.global.spring.xml = classpath:spring/default-instance.xmlCopy the code

The deployment data flow is as follows:

Tip: Although Canal supports both mixed and row type binlog logs, if the mixed type of log is used to obtain row data, the table name cannot be obtained. Therefore, this solution only supports row type binlog

Data synchronization

Create a Canal Client application to subscribe to the binlog data read by Canal

1. Enable multiple instance subscription and subscribe to multiple instances

public void initCanalStart() { List<String> destinations = canalProperties.getDestination(); final List<CanalClient> canalClientList = new ArrayList<>(); if (destinations ! = null && destinations.size() > 0) {for (String destination : Destinations) {// Dynamically fetch canal Server address based on ZooKeeper, set up a link, one of the servers crash, Can support failover CanalConnector connector = CanalConnectors. NewClusterConnector (canalProperties. GetZkServers (), destination, "", ""); CanalClient client = new CanalClient(destination, connector); canalClientList.add(client); client.start(); } } Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { try { logger.info("## stop the canal client"); for (CanalClient canalClient : canalClientList) { canalClient.stop(); } } catch (Throwable e) { logger.warn("##something goes wrong when stopping canal:", e); } finally { logger.info("## canal client is down."); }}}); }Copy the code

Subscription message processing

private void process() { int batchSize = 5 * 1024; while (running) { try { MDC.put("destination", destination); connector.connect(); connector.subscribe(); while (running) { Message message = connector.getWithoutAck(batchSize); Long batchId = message.getid (); int size = message.getEntries().size(); if (batchId ! = -1 && size > 0) { saveEntry(message.getEntries()); }
            connector.ack(batchId); // Submit confirmation // connector.rollback(batchId); }} Catch (Exception e) {logger.error("process error!" , e); } finally { connector.disconnect(); MDC.remove("destination"); }}}Copy the code

According to the database event processing message, filter the message list and process data changes, using the following information:

  • insert :schemaName,tableName,beforeColumnsList

  • update :schemaName,tableName,afterColumnsList

  • delete :schemaName,tableName,afterColumnsList

RowChange rowChage = null;
    try {
        rowChage = RowChange.parseFrom(entry.getStoreValue());
    } catch (Exception e) {
        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
    }
    EventType eventType = rowChage.getEventType();
    logger.info(row_format,
            entry.getHeader().getLogfileName(),
            String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
            entry.getHeader().getTableName(), eventType,
            String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime));
    if (eventType == EventType.QUERY || rowChage.getIsDdl()) {
        logger.info(" sql ----> " + rowChage.getSql());
        continue;
    }
   DataService dataService = SpringUtil.getBean(DataService.class); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { dataService.delete(rowData.getBeforeColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName()); } else if (eventType == EventType.INSERT) { dataService.insert(rowData.getAfterColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName()); } else if (eventType == EventType.UPDATE) { dataService.update(rowData.getAfterColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName()); } else {logger.info(" Unknown datatype: {}", eventType); }}}Copy the code

ColumnsList convert to MongoTemplate data class: DBObject, with data type conversion

public static DBObject columnToJson(List<CanalEntry.Column> columns) { DBObject obj = new BasicDBObject(); try { for (CanalEntry.Column column : columns) { String mysqlType = column.getMysqlType(); Long if (mysqltype.startswith ("int")) {int lenBegin = mysqlType.indexof ('('); int lenEnd = mysqlType.indexOf(')'); if (lenBegin > 0 && lenEnd > 0) { int length = Integer.parseInt(mysqlType.substring(lenBegin + 1, lenEnd)); if (length > 10) { obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue())); continue; }}
            obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Integer.parseInt(column.getValue())); } else if (mysqlType.startsWith("bigint")) { obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue())); } else if (mysqlType.startsWith("decimal")) { int lenBegin = mysqlType.indexOf('('); int lenCenter = mysqlType.indexOf(','); int lenEnd = mysqlType.indexOf(')'); if (lenBegin > 0 && lenEnd > 0 && lenCenter > 0) { int length = Integer.parseInt(mysqlType.substring(lenCenter + 1, lenEnd)); if (length == 0) { obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue())); continue; }}
                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Double.parseDouble(column.getValue()));
            } else if (mysqlType.equals("datetime") || mysqlType.equals("timestamp")) {
                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : DATE_TIME_FORMAT.parse(column.getValue()));
            } else if (mysqlType.equals("date")) {
                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : DATE_FORMAT.parse(column.getValue()));
            } else if (mysqlType.equals("time")) {
                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : TIME_FORMAT.parse(column.getValue()));
            } else {
                obj.put(column.getName(), column.getValue());
            }
        }
    } catch (ParseException e) {
        e.printStackTrace();        }
    return obj;
}Copy the code

Tip: If the DBObject object is used to hold both raw data and composite data or other data, it should be used to make a deep copy of the object and then use the copy

Data splicing

We will concatenate the database data, such as two user tables:

user_info:{id,user_no,user_name,user_password}
user_other_info:{id,user_no,idcard,realname}Copy the code

Mongo data after splicing are as follows:

user:{_id,user_no,userInfo:{id,user_no,user_name,user_password},userOtherInfo:{id,user_no,idcard,realname})Copy the code

Received a lot of data information, how to simply trigger the data splicing operation?

Look at the information we can obtain: schemaName, tableName, DBObject, Event (insert, update, delete)

Identify the information together and see: / schemaName/tableName/Event (DBObject), that’s right, is a standard restful links. As long as we implement a simple springMVC, we can automatically obtain the required data information for the concatenation operation.

Implement @controller, name it Schema, and value corresponds to schemaName

@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Component public @interface Schema { String  value() default ""; }Copy the code

Then implement @requestMapping, define the name as Table, and directly use the EventType in Canal to correspond to the RequestMethod

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public  @interface Table {
    String value() default "";
    CanalEntry.EventType[] event() default {};
}Copy the code

Then create springUtil, realize ApplicationContextAware interface, application start loading initialization two Map: intanceMap, handlerMap

@Override public void setApplicationContext(ApplicationContext applicationContext) { if (SpringUtil.applicationContext == null) { SpringUtil.applicationContext = applicationContext; // Initialize instanceMap data instanceMap(); // Initialize handlerMap data handlerMap(); } }private void instanceMap() { Map<String, Object> beans = applicationContext.getBeansWithAnnotation(Schema.class); for (Object bean : beans.values()) { Class<? > clazz = bean.getClass(); Object instance = applicationContext.getBean(clazz); Schema schema = clazz.getAnnotation(Schema.class); String key = schema.value(); instanceMap.put(key, instance); logger.info("instanceMap [{}:{}]", key, bean == null ? "null" : clazz.getName()); } }private void handlerMap(){ ... }Copy the code

Call method:

public static void doEvent(String path, DBObject obj) throws Exception { String[] pathArray = path.split("/"); if (pathArray.length ! Logger. info("path format incorrect: {}", path); return; } Method method = handlerMap.get(path); Object schema = instanceMap.get(pathArray[1]); / / not find mapping Bean and Method do not handle the if (Method = = null | | schema = = null) {return; } try { long begin = System.currentTimeMillis(); Logger. Info (" Integrate Data: {}, {}", path, obj); method.invoke(schema, new Object[]{obj}); Logger. info(" Integrate Data consume: {}ms: ", system.currentTimemillis () -begin); } catch (Exception e) {logger.error(" call combination logic Exception ", e); throw new Exception(e.getCause()); }}Copy the code

Data stitching message processing:

@Schema("demo_user")public class UserService { @Table(value = "user_info", event = {CanalEntry.EventType.INSERT, CanalEntry.EventType.UPDATE}) public void saveUser_UserInfo(DBObject userInfo) { String userNo = userInfo.get("user_no")  == null ? null : userInfo.get("user_no").toString(); DBCollection collection = completeMongoTemplate.getCollection("user"); DBObject queryObject = new BasicDBObject("user_no", userNo); DBObject user = collection.findOne(queryObject); if (user == null) { user = new BasicDBObject(); user.put("user_no", userNo); user.put("userInfo", userInfo); collection.insert(user); } else { DBObject updateObj = new BasicDBObject("userInfo", userInfo); DBObject update = new BasicDBObject("$set", updateObj); collection.update(queryObject, update); }}}Copy the code

The sample source code

https://github.com/zhangtr/canal-mongo

The original reference: http://www.torry.top/2017/10/22/canal-mongodb/

Recommended reading

  • That Microservice thing

  • The theoretical basis of microservices architecture – Conway’s Law

  • What is Spring Cloud doing from an architectural evolution perspective?