Wang Peng has joined the service platform of Qunar Air Ticket Business Division since 2017. He is mainly responsible for the related business of air ticket post-service, as well as the reconstruction and upgrade of data synchronization service and basic index service of the service platform.

background

In the Internet age, every day to produce vast amounts of data, where the net since its establishment, accumulated the massive user travel data, every trip ticket record, the hotel records, records of tickets, and so on, these data are stored in the database, now popular Mysql database, general order table below 1 kw capacity is best, If all the historical records were stored in the DB, each table would be over a billion levels in size, and queries based on different dimensions would be a nightmare, such as by order number, by user status, etc. The database would be overwhelmed, and slow queries could render the entire library unserviceable. A common solution is to store this data in an alternate database or heterogeneous data structure to improve our query efficiency. A data synchronization system does just that, importing data from one source to another, providing homogenous or heterogeneous data synchronization with low latency, providing highly available data synchronization services, and ensuring the ultimate consistency of data.

Open source solution for data synchronization

1.Databus

Databus is a low-latency, reliable, transaction-enabled, and consistent data synchronization system. Databus mines database logs to extract database changes from the database in real time and reliably. Businesses can obtain changes in real time and perform other business logic through customized clients.

Several features:

  • Source independent: Databus supports change fetching from multiple data sources, including Oracle and MySQL.
  • Scalable and highly available: Databus can scale to support thousands of consumer and transactional data sources while remaining highly available.
  • Transaction commit sequence: Databus maintains transactional integrity in the source database and delivers change events according to the transaction group and source commit sequence.
  • Low latency, support for multiple subscription mechanisms: Databus can commit transactions to consumers in milliseconds after data source changes have completed. At the same time, consumers can use server-side filtering in Databus to retrieve only the specific data they need.
  • Unlimited backtracking: Support unlimited backtracking capabilities for consumers, such as when consumers need to produce a full copy of the data, without any additional burden on the database. This feature can also be used when the consumer’s data is significantly behind the source database.

2.Canal

Canal is an open source project of Alibaba. It is implemented based on Java. It listens to Mysql’s binlog log to obtain data by simulating being Mysql’s slave. It can also obtain the data before and after the modification. Based on this feature, Canal can obtain the changes of Mysql data in a high performance.

Several features:

  • Source supports Mysql only.
  • Extensibility, Canal can send data to Kafka, or MQ so that developers can customize how the data is allocated.
  • Low latency, MS level delay synchronization, to meet the synchronization needs of near real time systems.
  • Transactional, enabling parsing and subscription to MySQL binlogs in different ways, such as through GTID.
  • Support backtracking, depending on the binglog storage time, can backcheck points for resynchronization.
contrast Canal Databus
Sources of support Mysql Mysql Oracle
scalability Messaging generic messaging interface Multi-consumer support
delay low low
transactional support support
Support the back Support adjustment point support
The complexity of the Relatively simple complex

Through comparison, it can be found that there is no big difference between Canal and Databus in the main function, because there is no problem of Oracle database synchronization in the current environment, so we chose Canal as a technical solution for data synchronization. It is also relatively simple to configure and deploy Canal compared to Databus.

Historical problems with the old system

The old system has many problems, how to refactor?

Core process:

  1. The Binlog generated by DB is transmitted to Databus (internal self-developed, with the same name as open source) through Canal, and Databus is divided into two large modules, producer and consumer. The recipient of the Binlog is the producer of the Databus. The main purpose of this module is to parse the Binlog data and convert it into a generic data structure, DataRow. Databus can also accept other types of data sources, such as the company’s QMQ, Kafka, and other data sources.

  2. The converted DataRow is sent to the consumer via Kafka, where the consumer module aggregates the data and passes it to an asynchronous data storage system such as ES, Hive, etc.

  3. The storage system now chooses ES as the final storage medium. ES itself is distributed, highly available, can store massive data and provide fast query. In addition, active and standby clusters are provided to support high availability and load balancing.

  4. The overall configuration and scheduling is done by ZK, including the main-standby of Canal, the up-and-down of producers and consumers of Databus, and the metadata management of the indexes are all configured in ZK.

  5. Query service: The Qgalaxy system mainly provides apis for line of business query to manage line of business query in a unified manner. Every line of business or application accessing the ES cluster needs this service for unified authentication, traffic switching and traffic limiting.

Through simplified diagrams of the architecture

  1. The Databus producer is single-point, non-highly available. This is a very serious problem, if there is a machine down, some index synchronization services will be suspended, resulting in failure. Consumer data written to some Kafka or ES is written dead, not configured, and is developed every time a new index is added. Switching between different test environments is also cumbersome.

  2. The version of Canal is too low. The old system uses 1.0.19, while the open source version has been updated to 1.0.25. There are some known bugs, and the configuration is recorded in a document. You must know the corresponding library for each index to find the corresponding machine operation and maintenance.

  3. All configuration files are stored in ZK, such as Databus’s producer consumer initialization data, connecting to those Kafka topics, receiving those messages, and writing to those indexes. This configuration information is used to fail to start a producer or consumer if ZK is not available.

  4. The problem with full synchronization is that Databus’s full synchronization needs to be hardcoded and does not have the flexibility to configure it according to the library.

  5. The problem of query system is that the code of Qgalaxy is coupled with a lot of logic of the business layer, resulting in bloated code. In addition, there is code directly connected to ES through Jar package, which makes the query layer virtual and is not good for the management and traffic limiting of the whole cluster.

  6. Storage tiers are deployed in a chaotic way. Important indexes are deployed with other indexes. If other unimportant indexes are faulty, core indexes will be greatly affected. The amount of core index data is huge, and unreasonable sharding leads to slow query efficiency. Low index version poor memory management, resulting in frequent GC.

In view of the above problems, the overall structure of the refactoring is proposed as follows:

  1. For Canal’s problem, Otter was introduced into the new architecture.

Otter is a system for obtaining incremental log data from the database based on Canal’s open source product. Otter abstracts the whole data transmission process into four stages:

Data on-fly, as much as possible without landing, faster data synchronization. (enable the node loadBalancer algorithm, if the node S+ETL falls on a different node, there will be a network transfer process)

  • The introduction of Otter solves the problem of Databus single point producer module, and multiple nodes can achieve load balancing and failure transfer. At the same time, it also solves the problem that Canal configuration is distributed to different files and is not convenient for unified management. The console of Otter integrates the management of all data sources, the management of synchronization relationship (Canal configuration management and database configuration management), and the management of master and slave (Canal master and slave, DB synchronization master/slave management), mapping table management, etc.
  • For problem 3, introducing Otter allows you to move some of the Databus configuration from the producer side to the Otter configuration, which requires some modifications to the Otter, such as the generation of the unique ID: Rowkey and the configuration to send to that message subject and group.
  • To solve the query problem in question 4, a universal gateway of ES Getway is mainly rewritten. This gateway has nothing to do with the version of ES. Therefore, for different VERSIONS of ES, it only provides the transmission of corresponding DSL statements and the transformation of types, without providing specific parsing. This part is put into the client Jar for compatibility.
  • For problem 5, full import, the open source Datax framework was introduced as a general solution for import.
  • To solve the problem of confusion in the deployment of ES cluster, core cluster and non-core cluster are grouped. When the metadata of the corresponding cluster is configured, non-core clusters are grouped. During the deployment, machines in the core cluster and non-core cluster are isolated.

The transform of Otter

The whole directory structure of Otter is divided into three parts: Node is the project that actually carries out data synchronization; Manager is the management of Node nodes, data statistics, coordination between nodes, synchronization of basic information, etc.; Share is the public part, interface abstraction, etc.

Changes to the new data synchronization system:

  1. The Load phase sends messages to Kafka. This part of the code transformation involves two points, a unique ID generation policy and sending Kafka messages. Add the last column of the partitionKey to the DataMediaPair class. Configure the column to be the partitionKey for Kafka. For example, the orderID column is the primary key column. When sending a message, the value of this field should be used as the PartitionKey. This purpose is to ensure that the data changes of the same TABLE with the same ID are all in the same Partition, and the ordering of consumption can be guaranteed. For example, each table has its own primary key ID and a unique ID at the business level. Select a unique ID at the business level as the PartitionKey to ensure the sequence of a service.
public class DataMediaPair { private Long id; private Long pipelineId; // Synchronize task id private DataMedia source; Private DataMedia target; // Target private Long pullWeight; Private Long pushWeight; private Long pushWeight; // Private ExtensionData resolverData; // Private ExtensionData resolverData; // Private ExtensionData filterData; Private ColumnPairMode ColumnPairMode = columnPairMode.include; private List<ColumnPair> columnPairs = new ArrayList<ColumnPair>(); private List<ColumnGroup> columnGroups = new ArrayList<ColumnGroup>(); private Date gmtCreate; private Date gmtModified; private String subEventType; Private Long kafkaDataMediaId; // For example, you want to synchronize data to other data elements. Private Boolean primaryTable; Private String groovy; // Groovy script private String esIndex; // es index private String partitionKey; // partition key }Copy the code
  1. The message is sent in the Load phase

In the Load phase, the function is to allocate the processing data according to the weight, and do the aggregation for the same PK to reduce the processing amount. Currently, the method supported by Otter is mainly to write to another database, so in the code below, the doLoad method is mainly multithreading to write to another database. Instead of writing to another database, you just send a message, and you modify this method.

public class DbLoadAction implements InitializingBean, DisposableBean {/** * Return the result to be a successfully processed record * RowBatch */ public DbLoadContext Load (RowBatch RowBatch, WeightController controller) { { WeightBuckets<EventData> buckets = buildWeightBuckets(context, datas); List<Long> weights = buckets.weights(); controller.start(weights); If (collectionUtils.isEmpty (datas)) {logger.info("##no eventdata for load"); } for (int I = 0; i < weights.size(); i++) { Long weight = weights.get(i); controller.await(weight.intValue()); List<EventData> items = buckets. GetItems (weight); logger.debug("##start load for weight:" + weight); Merge (items) = DbLoadMerger. Merge (items); DbLoadData loadData = new DbLoadData(); doBefore(items, context, loadData); // Perform the load operation doLoad(context, loadData); controller.single(weight.intValue()); logger.debug("##end load for weight:" + weight); }}Copy the code

The important thing here is RowBatch. RowBatch is an object that is a collection of binlogs for a database instance, not a collection of binlogs for a table or library. This one, if left unaddressed, could lead to a sequentiality problem. For example, in a database table scenario where each instance may contain multiple libraries, the synchronized DataMedia will be different and will be retrieved based on different data contents, not just one.

List<EventData> datas = rowBatch.getDatas(); ParitionKey is a Binlog issued by a Schma that mistakenly thinks it is a databaseCopy the code
DataMediaSource source = ConfigHelper. FindDataMedia (context. GetPipeline (), datas. The get (0). GetTableId ()). The getSource (); DataMediaPair dataMediaPair = ConfigHelper.findDataMediaPair(context.getPipeline(), datas.get(0).getPairId()); String partitionKey = dataMediaPair.getPartitionKey();Copy the code

DTS aggregation layer

The polymerization layer design is divided into two parts:

  1. Following the Manager and Node mode of Otter, THE DTS system is also divided into four stages of SETL, which are all executed in Node nodes. Manager is mainly used to maintain the start and stop of Node nodes and query their status.

  2. Business aggregation problem, ticket order form is composed of several child table: table order, payment information table, the segment information and passenger information table, the order of the whole data is the data aggregation of the child table, the child table data is likely to be one-to-many may also be a one to one, the aggregation layer design with the method of XML configuration to configure, defined as follows:

<Config> <! <parentTable name="order"> <! -- parent table rowKey definition, database column name and data type --> <! <rowKeyConfig column="order_no"columnType="string"/> <! Select * from subtable 1 where subtable 1 is a leaf level table (default: true, if isLeaf is not a leaf level table); select * from subtable 1 where subtable 1 is a leaf level table (default: true, if isLeaf is not a leaf level table); --> <childTablename="sub_order"isLeaf="false"rowType="array"> <! <rowKeyConfigcolumn="order_no"columnType="string"/> <! <childTablename="product"rowType="map" tinyInt1 ="male,is_valid"> <! <rowKeyConfigcolumn="product_no"parentColumn="sub_order_no"columnType="string"dependent="true"/> <rowKeyConfigcolumn="product_no"parentColumn="sub_order_no" </childTable> </childTable> <! -- Define the first level child table 2 --> <childTablename="ext_info"rowType="map"> <! <rowKeyConfigcolumn="order_no"columnType="string"/> </childTable> </parentTable> </CommonConfig>Copy the code

The core of the XML configuration mapping between a parent table and multiple child tables is to configure the associated foreign key. The parent table is order, the child table is suborder, and the child table rowKey: ORDERNo is the field associated with the child table and the parent table. The mapping between the two tables can be found through this field. How do you design a program structure that stores and uses these mappings?

Table relationship abstract interface

Public interface TableDataRow {// Get the name of the Table String getTableName(); RowType getType(); // Retrieve the associated key from the query SQL String getSql(); List<ChildTableDataRow> getChildren(); RowKey getRowKey(); }Copy the code

The implementation of the main table child table

Public class RootTableDataRow implements TableDataRow {private String tableName; // Private RowType type; // select * from private String sql; Private List<ChildTableDataRow> children; // Associate the foreign key. Private RowKey RowKey; Private Set<String> dbNames; private Set<String> dbNames; } public class ChildTableDataRow implements TableDataRow { private String tableName; private RowType type; private String sql; Id =10 Private WhereEntry WhereEntry; private List<ChildTableDataRow> children; Private TableDataRow parent; private RowKey rowKey; }Copy the code

The core code is the Rowkey generation logic. The Rowkey of all the child tables of a structure must be the same as that of the main table, so a recursive algorithm is needed to achieve this complex generation of Rowkey.

@param tableDataRow * @param thisTableRowKeyValue * @param schemaName * @return */ public String getRootRowKeyValue(TableDataRow tableDataRow, String thisTableRowKeyValue, String schemaName, CollectConfig CollectConfig) {// If there is no parent class, return rowKey value if (! tableDataRow.getRowKey().isDepend()) { return thisTableRowKeyValue; } // If (tableDataRow instanceof RootTableDataRow) {return thisTableRowKeyValue; } ChildTableDataRow Child = (ChildTableDataRow) tableDataRow; String parentRowKeyValue = getParentRowKeyValue(schemaName, thisTableRowKeyValue, child, collectConfig); TableDataRow parent = child.getParent(); Return getRootRowKeyValue(parent, parentRowKeyValue, schemaName, collectConfig); }Copy the code

Through the above we have realized the relational mapping and storage, in the later program to call the related function, and build the corresponding structure. \

Architecture of the universal ES gateway

The ES gateway encapsulates read/write apis for accessing the ES, and performs access authentication and flow control. \

It mainly includes the following modules:

  • Work order application module and authentication module, App access ES gateway application, after the application will be sent to the user an authorization code, to ensure access and data security.
  • Configuration module, the main configuration of some ES cluster metadata information, such as the cluster contains which indexes, ES version, groups, and permissions, index basic information, types, mapping relationship and so on.
  • Write module, provides synchronous write Dubbo way, and asynchronous write way Kafka message way, write data, synchronous write more to meet the needs of real-time, asynchronous write to meet the needs of different business, and the demand of complement.
  • The reading module provides common query apis, such as Count query, general query, deep paging query, Scroll query and other basic query functions.
  • Traffic switchover module: switches traffic for the o&M upgrade of the active and standby clusters. If the upgrade is not performed, load balancing is performed.
  • The flow-limiting module, which mainly uses the Hystrix framework, restricts QPS written to avoid overwhelming the ES cluster with heavy traffic. There is a limit not only on the number of requests, but also on the size of requests.

conclusion

The overall data synchronization system is divided into three parts, Otter part, mainly synchronizes database Binlog data to Kafka, DTS part is mainly the aggregation of business data, the data of the sub-table and the data of the main table is aggregated into a set of data, the gateway part is mainly responsible for the overall query and write to do a unified package, Provide write and query apis, improve performance through pooling and asynchronous methods, and provide traffic switching and traffic limiting capabilities. Hopefully this article will give you some insights into how to do data synchronization.