One, foreword

Canal is an open source project from Alibaba, developed purely in Java. Incremental data subscription & consumption based on database incremental log parsing, currently mainly supported by MySQL(also supported by mariaDB).

Canal supports binlog real-time incremental synchronization database as well as full synchronization. In this article, we introduce Canal to implement full synchronization from MySQL to Elasticsearch.

Full synchronization can be achieved by manually triggering ETL tasks using the Adapter’s REST interface.

During full synchronization, incremental synchronization tasks at the same destination will be blocked. After full synchronization is complete, the blocked incremental synchronization task will be woken up again

PS: See the article “Canal High Availability Architecture Deployment” for details on Canal deployment and real-time synchronization.

 

ETL interface

The ETL interface of the adapter is: / ETL /{type}/{task}

  • The default Web port is8081
  • Type: type (hbase/ ES7 / RDB)
  • Task Indicates the configuration file name corresponding to the task name, for example, sys_user.yml

 

Example:

The curl -x POST at http://127.0.0.1:8081/etl/es7/sys_user.ymlCopy the code

Successful execution output:

{"succeeded":true."resultMessage":"Importing ES data: 17 items"}
Copy the code

 

Three, the practice of the pit encountered

3.1. The connection pool is insufficient

If a large amount of data is synchronized, the following error occurs after a period of time

3.1.1. Cause analysis

Check canal source code and it is known that when the amount of synchronized data is greater than 1W, synchronization will be carried out in batches, 1W records in each batch, and multi-threading is used to execute tasks in parallel. The default connection pool of Adapter is 3, and this exception will be thrown when the thread waits more than 1 minute to acquire the database connection.

The number of threads is the number of threads available to the current server CPU

 

3.1.2. Solution

Modify the srcDataSources configuration item in the adapter conf/application.yml file and add the maxActive configuration database maximum number of connections to the number of available threads on the current server CPU

You can run the following command to view the number of CPU threads

grep 'processor' /proc/cpuinfo | sort -u | wc -l
Copy the code

 

3.2. The ES connection timed out

If a large number of table fields are synchronized, the following error may occur

3.2.1. Cause analysis

The commitBatch commitBatch size is set too large in the adapter table mapping configuration file (6000)

 

3.2.2. Solution

Change the commitBatch configuration item in the conf/ ES7 /xxx.yml mapping file of the Adapter to 3000

 

3.3. Synchronization is slow

Thirty million data takes about 3.5 hours

3.3.1. Cause analysis

When the amount of data is greater than 1W, Canal will synchronize the data in batches, and 1W data in each batch is realized through paging query. So when there is a large amount of data, there will be deep paging and the query will be very slow.

 

3.3.2. Solution

Use ID, time, or service fields to synchronize data in batches to reduce the amount of data to be synchronized each time.

 

3.3.3. Case

Use ID for data batch, suitable for growing type of ID, such as self-increasing ID, snowflake ID, etc.

  1. Check the minimum ID, maximum ID, and total data
  2. The ID range of each batch is calculated based on the data volume of each batch

 

Calculation process:

  • The minimum ID is 1333224842416979257
  • The maximum ID is 1341698897306914816
  • Total data = 3kw
  • The value of each synchronization is 300w

 

(1) Calculate The Times of synchronization

Total data/Synchronization volume = 10Copy the code

 

(2) Calculate the incremental value of each batch of IDS

(Maximum ID - Minimum ID)/Times = 847405488993555.9Copy the code

 

(3) Calculate the value of each batch of IDS

Minimum ID + Increment = ID2 ID2 + Increment = ID3... ID9 + Incremental value = Maximum IDCopy the code

 

(4) Use batch ID values for synchronization

Modify the etlCondition parameter of the SQL mapping configuration:

etlCondition: "where id >= {} and id < {}"
Copy the code

 

Call etL interface, and add params parameter, used between multiple parameters; segmentation

The curl -x POST at http://127.0.0.1:8081/etl/es7/sys_user.yml? Params = minimum ID; ID2 curl -x POST at http://127.0.0.1:8081/etl/es7/sys_user.yml? params=ID2; ID3 ...Copy the code

 

Scan code attention has surprise!