A, requirements,

In the early days of big data technology application, we used Sqoop as a data synchronization tool to meet the daily development requirements of data synchronization between MySQL and Hive.

With the development of our business, there are more and more data synchronization scenarios, mainly between MySQL, Hive and text files. Sqoop cannot fully meet our needs. In early 2017, we were tired of Sqoop and ready to revamp our data synchronization tools. There were some of the most painful needs:

  • Multiple data synchronization exceptions caused by MySQL changes. Procedure MySQL needs to support read/write separation and table/library separation, and be compatible with possible database migration, node outages, and master/slave switchover
  • A number of exceptions are caused by table structure changes. The table structure of MySQL or Hive may change and must be compatible with most table structure inconsistencies
  • MySQL read and write operations do not affect online services, do not trigger MySQL o&M alarms, and do not want to be sprayed by the DBA every day
  • You want to support more data sources, such as HBase, ES, and text files

As a data platform administrator, I also hope to collect more operation details for daily maintenance:

  • Collect statistics, such as run time, data volume, and consumed resources
  • Dirty data verification and reporting
  • Run logs are expected to be connected to the company’s log platform for easy monitoring

Second, the selection of

Based on the above data synchronization requirements, we plan to make the transformation based on open source. The objects we examine are mainly DataX and Sqoop. The function comparison between them is as follows

function DataX Sqoop
The operation mode Single process multithreading MapReduce
MySQL, speaking, reading and writing Single machine pressure is large; Read and write granularity is easy to control The MapReduce mode is heavy, and it is difficult to handle write errors
Hive, speaking, reading and writing High single-machine pressure Good scalability
The file format Orc support Orc is not supported, you can add it
distributed If this function is not supported, you can use the scheduling system to avoid it support
Flow control It has flow control function Need to customize
statistics Some statistics have been reported and need to be customized No, distributed data collection is inconvenient
Data validation There’s verification in core No, distributed data collection is inconvenient
monitoring Need to customize Need to customize
community Open source is new and the community is not active It’s been active, the core has changed very little

The main disadvantage of DataX is that it runs on a single machine, which can be avoided by scheduling system. In other aspects, its functions are superior to Sqoop. Finally, we choose to develop based on DataX.

3. Preliminary design

3.1 Operating Mode

The most important thing to do with DataX is to solve the problem of distributed deployment and operation. DataX itself is a single-process client mode of operation, and you need to consider how to trigger the operation of DataX.

We decided to reuse the existing offline task scheduling system, which is responsible for task triggering, while DataX is only responsible for data synchronization. This reuses system capability and avoids duplication of development. For the scheduling system, please refer to the article “The Best Practice of Big Data Development Platform in Praise”.

On the worker server of each data platform, a DataX client will be deployed, and multiple processes can be started simultaneously when running, which are controlled by the scheduling system.

3.2 Actuator design

In order to interact with existing data platforms, you need to make some custom changes:

  • Status reports that comply with platform rules, for example, start, Running, or end, must report progress when running, and success or failure when ending
  • Run logs that meet platform rules are reported in real time for display
  • Parameters for submodules such as statistics, validation, and flow control can be passed in from the platform and the results need to be persisted
  • You need to be compatible with abnormal input, such as MySQL primary/secondary switchover and table structure change

3.3 Development Strategy

The general running process is: pre-configuration file conversion, table structure verification -> (input -> DataX core + business independent verification -> output) -> post statistics/persistence

Try to ensure that DataX focus on data synchronization, as far as possible not implied business logic, to have a unique business logic in DataX, data synchronization process can not meet the needs, to modify the source code.

The run-time pre-validation logic of table structure, table naming rules, and address translation, as well as the persistence of run results, is placed in the metadata system (refer to the “Good Data Warehouse Metadata System Practice”), and the monitoring of health is placed in the scheduling system.

Four, source code transformation

4.1 Hive Read and write

DataX does not have built-in Hive readers and writers, but HDFS readers and writers. We choose to encapsulate outside DataX, convert Hive read and write configuration files to HDFS read and write configuration files, and assist Hive DDL operations. Specifically, we made the following transformation:

4.1.1 Hive Read Operations

  • The HDFS path is spliced based on the table name. One of the great data warehouse specifications prohibits the use of external tables, which makes HDFS path stitching easy. For external tables, you need to get the corresponding path from the metadata system
  • Hive table structure depends on the metadata system. You also need to verify the Hive table structure, as described later

4.1.2 Hive Write Operations

  • The Hive file format and separator are not specified in the Hive write configuration. You need to read metadata and fill the metadata information into the HDFS write configuration file
  • Supports the creation of non-existing Hive tables or partitions and the construction of tables that comply with data warehouse specifications

4.2 MySQL -> Hive Compatibility

The DataX reader and writer are designed to be independent of each other, but in practice they often need to be associated to avoid errors. Table structures of MySQL and Hive may be inconsistent if MySQL adds or subsets fields or the field type changes. Avoid inconsistent operation errors.

4.2.1 MySQL -> Hive non-partitioned table

Non-partitioned tables are fully imported using the mysqlReader configuration. If the MySQL configuration field is inconsistent with the Hive structure, drop the Hive table and rebuild the Hive table. Table reconstruction may bring the risk of downstream Hive SQL errors, which can be avoided by SQL periodic checks.

During Hive table reconstruction, you need to convert MySQL fields to Hive types, such as MySQL vARCHar to Hive String. MySQL > convert int unsigned to bigint; MySQL > convert bigint to bigint; In the same way, MySQL bigint unsigned also needs to be converted upward. We are bold to convert bigint according to the actual business situation. Hive string is a universal type, so if you don’t know how to convert it, it is safer to use string.

MySQL -> Hive partition table

Do not change the table structure of Hive partition tables. Changing the table structure may cause data reading exceptions in the old partition. Therefore, when writing Hive partition tables, use the Hive table structure. If the Hive table structure is inconsistent, an error message is reported. We adopted the following strategy

MySQL fields Hive Actual Field Processing method
a,b a,b normal
a,b,c a,b Ignore unnecessary MySQL fields and use Hive
b,a a,b Wrong order, adjust
a a,b MySQL missing one, error reported
a,c a,b No match, error reported
Unspecified field a,b The Hive shall prevail

Mysql > alter table bigINT; mysql > alter table bigint; mysql > alter table bigint;

4.3 Adapting to a MySQL Cluster

MySQL cluster is managed by RDS middleware. There are two modes: read/write separation and table/library separation. There are two options for reading and writing MySQL, either through RDS middleware or directly reading and writing MySQL instances.

plan Give priority to disadvantages
Even the instance Performance is good; Online services are not affected You need to modify the configuration when maintaining the standby database or changing the IP address. The developer does not know the standby library address
Even the RDS Consistent with common application; The back-end maintenance is shielded Additional pressure on RDS, risk of impact on online business; Full compliance with company SQL specifications is required

For writing to MySQL, the amount of data to be written is generally small. DataX uses RDS, so there is no extra consideration for master/slave replication. For MySQL reading, DataX chooses to directly connect to MySQL instance to read data, considering that there are a large number of full table synchronization tasks, especially the peak traffic of offline tasks in the early morning, to avoid the impact of heavy traffic on RDS middleware. To avoid the risk of address changes associated with MySQL maintenance, we do a few more things:

  • Metadata maintains the standard RDS middleware address
  • The addresses of master library, slave library and RDS middleware can be associated and converted arbitrarily
  • Get the latest master and slave library addresses each time a DataX task starts
  • Periodically check MySQL connectivity
  • Establish a collaborative relationship with dbAs to provide advance notice of changes

When reading MySQL, for read/write separation, each time to obtain one of the slave library address and connect; If we have 1024 shards, we need to translate 1024 slave addresses into the DataX configuration file.

4.4 Compatibility with MySQL O&M Specifications

4.4.1 Avoid slow SQL

MySQL > create table (s); MySQL > create table (s); Another operation specification, SQL running more than 2s will be forcibly killed.

MySQL > select * from table where id = 1; MySQL > select * from table where id = 1; MySQL > select * from table where id = 1; from table_name where id>? by id asc limit ?

4.4.2 Preventing fast READ/write services from affecting other services

After executing an SQL statement, it forces a sleep to keep the system from getting too busy. We dynamically generate both the batchSize of the insert and the page size of the select, depending on the time of the previous run, with more sleep for running too fast and less sleep for running too slow, adjusting the number of next batches.

There is still room for improvement in dynamically adjusting rates based on system-level monitoring metrics such as disk usage, CPU usage, binlog latency, and so on. In actual operation, deleting data is easy to cause binlog delay. It is impossible to determine whether the deletion is too fast only from the running time of delete statement. The specific reason has not been further investigated.

4.5 More plug-ins

In addition to the most commonly used MySQL, Hive, and text with simple logic, we also made simple changes to read and write HBase based on service conditions. We also developed esWriter and kvWriter with KVDS, all of which are developed and maintained by the storage developers.

4.6 Interaction with big data Systems

4.6.1 Reporting operation Statistics

DataX comes with run result statistics, which we want to report to the metadata system and store as process metadata for ETL.

Based on our development strategy, instead of embedding the API of the favorable metadata system into the DataX source code, stDOUT is obtained outside the DataX, and the printed statistics are intercepted and reported.

4.6.2 Interaction with the data platform

The data platform provides an edit page for the DataX task, which leaves the DataX running configuration file and scheduling cycle on the platform. The scheduling system periodically starts DataX tasks based on the scheduling period and configuration file. Each DataX task runs as an independent process. The task ends after the process exits. Running, DataX logs are transferred in real time and displayed on the page.

4.7 Consider more exceptions

Catch Exception is used in most scenarios of DataX code, and there is no compatibility or retry for each abnormal scenario. When network, IO and other anomalies occur in the execution of a large task, it is easy to cause task failure. The most common exception is SQLException, which needs to be classified. For example, SQL exceptions are considered to retry, batch processing is changed to single processing, and network exceptions are considered to rebuild database connections. HDFS is more tolerant of exceptions, and DataX catches fewer exceptions.

4.8 Test Scenario Transformation

4.8.1 Continuous integration

To find low-level problems, such as tables being migrated but still performing tasks, or regular tables being changed to partitioned tables, we “replay” all the important DataX tasks that ran that day after 20pm each night.

This is not an as-is replay, but a test identifier is added to the configuration file. After DataX starts, the Reader section reads only one line of data, while Writer points the target address to a test space. This test ensures that the basic DataX functionality is fine and that the entire runtime environment is fine.

4.8.2 Full-link pressure test Scenario

The likable full-link pressure measurement system generates data through Hive, and imports the generated data into the shadow library through DataX. A shadow library is a database built into production MySQL that is not visible to normal applications and can only be accessed with special hints of SQL.

Full-link pressure test in a production environment is a high-risk operation. Incorrect configuration files may damage actual production data. The DataX MySQL read/write parameter is marked with full link pressure, and can only read/write specific MySQL and Hive libraries, and configure the data platform to make a visible reminder.

5. Online operation

DataX was launched in the second quarter of 2017, with most of the above features completed by Q3 2017, followed by only minor fixes. By Q1 2019, it has been running stably for more than 20 months. At present, it runs more than 6000 DataX tasks every day and transmits more than 10 billion lines of data. It is a relatively stable component of the data platform.

There were a few hiccups, but one impressed me. The native hdfsreader has a bug in reading large orc files. The orc read API splits large files into multiple pieces (default: 256MB or larger). The datax reads only the first shard. Because 256MB is large enough, this problem rarely appears very stealthily. In addition, no major bugs have been found. Most of the problems encountered are problems in the operating environment or users’ understanding, or small problems that can be overcome.

6. Next step

There are no further development plans for DataX. More than a dozen improvements have been accumulated in the requirements list, which will not affect online operation even if they are not improved for a year, such as improving the readability of dirty data and supporting HDFS HA. These are not important or urgent needs that will not be put into action for the time being.

DataX mainly deals with batch synchronization and cannot meet the requirements of most incremental and real-time synchronization. We also have a mature solution for incremental synchronization, and there will be another article about our own incremental synchronization product.