Quasi-real-time index buildingcanal

Canal is a middleware of Ali. The source is mysql and the target is other storage. By virtue of the master/standby synchronization mechanism of mysql, Canal of Ali disguises itself as a standby database of mysql to perceive the changes of binlog binary information in mysql. At the same time, a structured data is synchronized to the target consumer for information model conversion, which can store the changed data in mysql to other storage through pipes.Copy the code
Download decompression

Download alibaba’s Canal component, download the address canal, upload it to /opt/software directory of cluster node, and unzip it to /opt/apps directory:

#Here I download the following four for convenience, if you just use Canal, download Adapter and deployer
#Decompress the package. Before decompressing the package, create directories of /opt/apps/adapter-1.1.4, /opt/apps/admin-1.1.4, /opt/apps/deployer-1.1.4 and example-1.1.4
[yangqi@yankee software]$ tar -zvxf canal.adapter-1.1.4.tar.gz -C ../apps/adapter-1.1.4
[yangqi@yankee software]$ tar -zvxf canal.adapter-1.1.4.tar.gz -C ../apps/admin-1.1.4
[yangqi@yankee software]$ tar -zvxf canal.adapter-1.1.4.tar.gz -C ../apps/deployer-1.1.4
[yangqi@yankee software]$ tar -zvxf canal.adapter-1.1.4.tar.gz -C ../apps/example-1.1.4
Copy the code
configurationmysql

Mysql > enable primary/secondary

Mysql > install (master); mysql > install (master); /etc/my.cnf [yangqi@yankee software]$sudo vi /etc/my.cnf  ===================================================================== server-id=1 binlog_format=ROW log_bin=mysql_bin = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = # configuration is completed, [yangqi@yankee software]$sudo systemctl restart mysqld [yangqi@yankee software]$ mysql -u root -pxiaoer mysql> show variables like 'log_bin'; # in the following content is already configured the node mysql opens the bin_log = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = +---------------+-------+ | Variable_name | Value | +---------------+-------+ | log_bin | ON | +---------------+-------+ 1 row in the set (0.07 SEC) = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = # generally will not use the root account to master-slave, So I need to create a new account. I have already created one, so I won't create it anymore. But I need authorization. Mysql > grant select,replication slave,replication client on *.* to 'yangqi'@'%' identified by 'xiaoer'; # in the following content is already success = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = Query OK, 0 rows affected, 1 warning (0.05 SEC) = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = # need to localhost yangqi account Connect to grant permissions. Mysql > grant select,replication slave,replication client on *.* to 'yangqi'@'localhost' identified by 'xiaoer'; # in the following content is already success = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = Query OK, 0 rows affected, 1 warning (0.05 SEC) = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = # refresh access mysql > flush privileges;Copy the code
configurationcanalThe pipe
[yangqi @ Yankee apps] $CD deployer - 1.1.4 / conf/example#Edit the instance.properties file
[yangqi@yankee example]$ vi instance.properties
#Modify the following= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =## mysql serverId, v1.0.26+ will autoGen
canal.instance.mysql.slaveId=2

# username/passwordcanal.instance.dbUsername=yangqi canal.instance.dbPassword=xiaoer = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =#Start the deployer[yangqi@yankee example]$ cd .. /.. / [yangqi @ Yankee deployer - 1.1.4] $bin/startup. Sh#Check whether deployer started[yangqi @ Yankee deployer - 1.1.4] $ps - ef | grep canal#Or check whether port 11111 is occupied= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = [yangqi @ Yankee deployer - 1.1.4] $netstat - ntulp | grep 11111#The startup is successful if the following information is displayedTCP 0 0 0.0.0.0:11111 0.0.0.0: * LISTEN 45531 / Java = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =Copy the code
Startup errors

Log file logs/canal/canal_stdout.log. If an error similar to the following is reported:

# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 1073741824 bytes for committing reserved memory.
# An error report file with more information is saved as:
#/ opt/apps/deployer - 1.1.4 / bin/hs_err_pid45386 logJava HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=96m; Support was removed in 8.0 Java HotSpot(TM) 64-bit Server VM Warning: Ignoring option MaxPermSize= 256M; Support was removed in 8.0 Java HotSpot(TM) 64-bit Server VM Warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release. Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x0000000700000000, 1073741824, 0) failed; error='Cannot allocate memory' (errno=12)Copy the code

The error may be caused by insufficient memory. You can modify the following parameters in startup.sh:

#You can adjust the -xms-XmX-XMN parameters as appropriate for your machine
if [ -n "$str" ]; then
        JAVA_OPTS="-server -Xms256m -Xmx256m -Xmn256m -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -Xss256k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError"
else
        JAVA_OPTS="-server -Xms256m -Xmx256m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:MaxPermSize=128m "
fi
Copy the code
configurationcanal adapter
Modify thecanal-adapterModule source code
#As the Adapter is incompatible with ElasticSearch-7.3.0, download the source package to the local PC for recompilation. Ensure that you select the corresponding version
#Modify the four elasticSearch dependencies in the pom. XML file in the Canal-Adapter module to version 7.3.0
#Go to the command line, go to the source root directory, mine is canal-canal-1.1.4 directory, execute
mvn clean package -DskipTests

#The following error may be reported during the first execution===================================================================== [ERROR] Failed to execute goal Org, apache maven. Plugins: maven - compiler - plugin: 3.7.0: compile (default - the compile) on the project client - adapter. Elasticsearch: Compilation failure [ERROR] / E: / code/JavaEE/lot/canal - canal - 1.1.4 / client - adapter/elasticsearch/SRC/main/Java/com/alibaba/otter/canal/client/adapt Er /es/ esadapter.java :[223,56] Org, apache lucene. Search. TotalHits cannot be converted into long [ERROR] [ERROR] - > [1] Help [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException [ERROR] [ERROR] After correcting the problems, you can resume the build with the command [ERROR] mvn <args> -rf :client-adapter.elasticsearch = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =#Modify line 233 in the ESAdapter class to read as follows===================================================================== long rowCount = response.getHits().getTotalHits().value; = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
#The second execution may result in the following error===================================================================== [ERROR] Failed to execute goal Org, apache maven. Plugins: maven - compiler - plugin: 3.7.0: compile (default - the compile) on the project client - adapter. Elasticsearch: Compilation failure [ERROR] / E: / code/JavaEE/lot/canal - canal - 1.1.4 / client - adapter/elasticsearch/SRC/main/Java/com/alibaba/otter/canal/client/adapt Er/es/support/ESConnection. Java:,47 [420] to kind of org. Elasticsearch. Client. RestHighLevelClient methods in bulk is applied to a given type; [ERROR] need: org. Elasticsearch. Action. Bulk. BulkRequest, org. Elasticsearch. Client. The RequestOptions [ERROR] find: Org. Elasticsearch. Action. Bulk. BulkRequest [ERROR] reasons: [ERROR] [ERROR] -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException [ERROR] [ERROR] After correcting the problems, you can resume the build with the command [ERROR] mvn <args> -rf :client-adapter.elasticsearch = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =#Modify line 420 in the ESConnection class to read as follows===================================================================== return restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT); = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
#After the package is completed, go to the client-Adapter /launcher/target/ directory, upload the newly compiled Canal-Adapter to the apps directory of the cluster node, and delete the previously decompressed Adapter-1.1.4
Copy the code
configurationAdapter - 1.1.4
#Example Change the name of the canal-Adapter directory to Adapter-1.1.4
[yangqi@yankee apps]$ mv canal-adapter adapter-1.1.4
#Modify adapter-1.1.4 configurations[yangqi @ Yankee adapter - 1.1.4] $vi. / conf/application. The yml#Change the content to the following===================================================================== srcDataSources: defaultDS: url: JDBC: mysql: / / 127.0.0.1:3306 / recommendedsystem? UseUnicode =true username: yangqi password: xiaoer-name: es # Hosts: 192.168.21.89:9300 properties: mode: Auth: test:123456 # only used for rest mode cluster.name: Yankee =====================================================================
#Modify es configurations and create shop.yml in adapter-1.1.4/conf/es[yangqi @ Yankee adapter - 1.1.4] $vi. / conf/es/shop. Yml#Write the following===================================================================== dataSourceKey: defaultDS destination: example groupId: esMapping: _index: shop _type: _doc _id: id upsert: true sql: "select a.id, a.name, a.tags, concat(a.latitude, ',', a.longitude) as location, a.remark_score, a.price_per_man, a.category_id, b.name as category_name, a.seller_id, c.remark_score as seller_remark_score, c.disabled_flag as seller_disabled_flag from shop a inner join category b on a.category_id = b.id inner join seller c on  c.id = a.seller_id" commitBash: 3000 =====================================================================Copy the code
Start theadapter
#Bin /startup.sh and bin/stop.sh are granted executable permissions because adapter-1.1.4 is newly compiled[yangqi@yankee adapter-1.1.4]$chmod 764 bin/startup.sh [yangqi@yankee adapter-1.1.4]$chmod 764 bin/stop.sh
#Start the adapter[yangqi @ Yankee adapter - 1.1.4] $bin/startup. ShCopy the code
Startup errors

You can check the bin/hs_err_pid48030.log file. If an error similar to the following is reported:

Memory: 4k page, physical 1863104k(71356k free), swap 4001788k(578132k free)
Copy the code

The error may be caused by insufficient memory. You can modify the following parameters in startup.sh:

#You can adjust the -xms-XmX-XMN parameters as appropriate for your machine
if [ -n "$str" ]; then
        JAVA_OPTS="-server -Xms256m -Xmx256m -Xmn256m -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -Xss256k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError"
else
        JAVA_OPTS="-server -Xms256m -Xmx256m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:MaxPermSize=128m "
fi
Copy the code
Start the
#Start the adapter[yangqi @ Yankee adapter - 1.1.4] $bin/startup. Sh#Check whether the Adapter is started[yangqi @ Yankee adapter - 1.1.4] $ps - ef | grep canal#Or check whether port 11111 is occupied= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = [yangqi @ Yankee adapter - 1.1.4] $netstat - en | grep 11111#The startup is successful if the following information is displayedTCP 00 127.0.0.1:44766 127.0.0.1:11111 ESTABLISHED 1000 5460274 TCP 00 127.0.0.1:11111 127.0.0.1:44766 ESTABLISHED 1000, 5459446 = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =Copy the code
testcanal
#Continue to monitor adapter - 1.1.4 / logs/adapter/adapter. The log[yangqi@yankee adapter-1.1.4]$tail -f logs/adapter/adapter.log
#If you modify the contents of the mysql array library, you can see that the adapter.log prints the contents of the changes almost at the same time
Copy the code

The following error information is displayed in the adapter.log:

Modify the aplication. Yml file and delete the following content:

#Delete the following contents from es module
mode: transport
Copy the code

Restart the Adapater test and observe the contents of the adapater. Log file:

Build a way
After canal finds that data in mysql has changed, it will conduct quasi-real-time update. During the update, Canal will detect which ID has changed, so as to update the content of a modified ID. However, the modification is not very intelligent. Then, it will only modify the value of the name field with ID 1. For example, two different names exist at the same time, then Canal will modify the two names at the same time and change them to the content just modified in the database. So building directly using Adapter is obviously not enough for more complex situations.Copy the code