introduce

I used MyBatis Intercept to synchronize ES data before, but there are two problems: one is bad data synchronization across libraries, and the other is that the data modified by manual operation of the database cannot be obtained. So Maxwell+kafka is used to synchronize MySQL binlog and write data to ES by parsing the binlog.

The environment

  • Kafka_2. 12 – server. TGZ
  • Maxwell – 1.23.5. Tar. Gz
  • Elasticsearch – 7.7.1 – Linux – x86_64. Tar. Gz
  • 7.26 MySQL_5.
  • Linux version 4.15.6-1.el7.elrepo.x86_64 – Red Hat 4.8.5-16

MySQL

Binlog open

#Modify my.cnf and add the following configuration.
[root@Server1d220 mysql]$ sudo vim /etc/my.cnf
#Specify a random string whose name cannot be the same as the name of another machine in the cluster. If there is only one machine, you can specify it arbitrarily
server_id=1 
log-bin=master 
#Select Row mode
binlog_format=row 

sql_mode=NO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES

#Restarting the Database
systemctl  restart mysqld.service  

#Start MySql set parameters
mysql> set global binlog_format=ROW;
mysql> set global binlog_row_image=FULL;
Copy the code

Other commands

Check whether binlog is enabledshow global variables like '%log_bin%'
Copy the code

# to check the binlogshowbinlog events; Check the name of the latest binlog fileshowmaster status; # find binlog/ -name mysql-bin -Type f # Displays detailed log configuration informationSHOW  GLOBAL VARIABLES LIKE '%log%'; Mysql data store directoryshow variables like '%dir%'; View the contents of binlogshow binlog events in 'mysql-bin.000003';

Copy the code

Kafka

Kafka installation

  • Kafka’s official website
  • There is no need to download ZooKeeper separately because Kafka comes with the distribution
  • Here is the standalone installation
#Download the KafkaRoot @ Server1d220 wget HTTP: / / https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.12-2.6.0.tgz
#Unpack the installation
[root@Server1d220]#The tar - ZXVF kafka_2. 12 - server. TGZ
Copy the code

Modify Kafka and ZooKeeper configurations

#Modify the Kafka configuration[root @ Server1d220 kafka_2. 12 - server] # vim config/server properties#Change the Intranet IP addressListeners = PLAINTEXT: / / 192.168.1.220:9092#Return address of an extranet client accessAdvertised. Listeners = PLAINTEXT: / / 192.168.1.220:9092#Changing the Log LocationThe dirs = / home/lib/kafka_2. 12 - server/data#Set the ZooKeeper address
zookeeper.connect=localhost:2181
#Set the message expiration time
log.retention.hours=24
#Setting a Clearing Policy
log.cleanup.policy=delete
#Delete old data when it exceeds 10 GBThe retention. Bytes = 10737418240 [root @ Server1d220 kafka_2. 12 - server] # vim config/consumer. Properties#Setting the Cluster AddressThe bootstrap. The servers = 192.168.1.220:9092#Set the group
group.id=dev-consumer-group

#Example Modify Zookeeper configurations[root @ Server1d220 kafka_2. 12 - server] # vim config/zookeeper propertie#Modifying a File DirectoryDataDir = / home/lib/kafka_2. 12 - server/zookeeper/data#Modify the port
clientPort=2181
Copy the code

Start Kafka and Zookeeper

#Zookeeper is started in the background[root @ Server1d220 kafka_2. 12 - server] # nohup. / bin/zookeeper - server - start. Sh. / config/zookeeper. Properties >logs/zookeeper.log 2>&1 &
#Start Kafka in the background[root @ Server1d220 kafka_2. 12 - server] # nohup. / bin/kafka - server - start. Sh. / config/server properties > logs/kafka. Log > & 1 & 2
#Create a Kafka Topic with the name Maxwell
[root@Server1d220 kafka_2.12-2.6.0]# ./bin/kafka-topics.sh --create --zookeeper 192.168.1.220:2181 --replication-factor 1 --partitions 1 --topic maxwell

Copy the code

Unified startup script

#! /bin/sh

#Start the zookeeperNohup. / bin/zookeeper - server - start. Sh. / config/zookeeper. Properties > / dev/null 2 > &1 & sleep 3 # 3 seconds after the execution, etc
#Start the kafka
nohup ./bin/kafka-server-start.sh ./config/server.properties >/dev/null 2>&1 &
Copy the code

The cluster configuration

Note the following about the cluster configuration:

  • Modify the zookeeper. The properties
#Configure the following parameters[root @ Server1d238 kafka_2. 12 - server] vim config/zookeeper properties dataDir = / usr/local/kafka my_dir/zookeeper/data InitLimit =10 syncLimit=5 maxClientCnxns=0 server.1=192.168.1.238:2888:3888 server.2=192.168.110.70:2888:3888 Server. 3 = 192.168.110.71:2888-3888
#Create the myID file in the dataDir directory
#The contents of myID are 1, 2, and 3 after server238 host: echo 1 > myID 70 Host: echo 2 > myID 71 Host: echo 3 > myIDCopy the code
  • Modify server properties
[root @ Server1d238 kafka_2. 12 - server] vim config/server properties#The following configuration needs to be modified in the cluster
#The only sign
broker.id=0
#Listen on the address, change to the local IP portListeners = PLAINTEXT: / / 192.168.1.238:9092 advertised. Listeners = PLAINTEXT: / / 192.168.1.238:9092#Change the IP address of the ZooKeeper clusterZookeeper. Connect = 192.168.1.238:2181192168 110.70:2181192168 110.71:2181Copy the code

Other commands

#Query the list of service topics[root@Server1d220 kafka_2.12-2.6.0]#./bin/kafka-topics
#Delete the topic
#Delete.topic.enable = needs to be set in server.propertiestrue
[10080@Server1d220 kafka_2.12-2.6.0]$ ./bin/kafka-topics.sh --delete --zookeeper 127.0.0.1:2181 --topic contradictionmediation__member_info

#Query topic details[10080@Server1d220 kafka_2.12-2.6.0]$./bin/kafka-topics. Sh -- ZooKeeper 127.0.0.1:2181 --topic contradictionmediation__member_info --describe
#Query consumer messages[10080@Server1d220 kafka_2.12-2.6.0]$./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic contradictionmediation__member_info --from-beginning
#Query the consumer list under the Kafka node[10080@Server1d220 kafka_2.12-2.6.0]$./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
#Delete a grop[10080@Server1d220 kafka_2.12-2.6.0]$./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --delete --group tablePartiesInfoConsumer
#Kafka stop command[root @ Server1d220 kafka_2. 12 - server] #. / bin/kafka - server - stop. Sh
#Zookepper Stop command[root @ Server1d220 kafka_2. 12 - server] #. / bin/zookeeper - server - stop. Sh
#Zookepper Queries information about Kafka Borkes[root@Server1d220 kafka_2.12-2.6.0]#./bin/zookeeper-shell.sh 127.0.0.1:2181 [root@Server1d220 kafka_2.12-2.6.0]# ls /brokers/ids
#Empty data
#Execute on each server[root@Server1d238 kafka_2.12-2.6.0]# rm -rf logs/* [root@Server1d238 kafka_2.12-2.6.0]# rm -rf data/* [root@Server1d238 kafka_2.12-2.6.0 Kafka_2. 12 - # rm - rf zookeeper server] / / version - 2
#Example Query zookepper configuration information
#Querying controller 0
[root@Server1d238 kafka_2.12-2.6.0]# bin/zookeeper-shell.sh localhost:2181 <<< "get /brokers/ids/0"
Copy the code

Abnormal related

  • Exception: The Cluster ID 4EneH1qLR3KH69gubQF8kg doesn’t match stored clusterId Some(WslSvgQJQMiTl4rT18Eugg) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.

Dirs =/home/lib/kafka_2.12-2.6.0/logs

  • Replication factor: 3 LARGER than available brokers: 1

Solution: The Kafka service currently has less brokers available than the set replication-factor. Modify the replication-factor parameter of the Topic.

Maxwell

Maxwell’s installation

Maxwell GitHub is installed on a standalone server

#download[root @ Server1d220 lib] # wget HTTP: / / https://github.com/zendesk/maxwell/releases/download/v1.23.5/maxwell-1.23.5.tar.gz
#Unpack the installation
[root@Server1d220 lib]# tar -zxvf maxwell-1.23.5.tar.gz
Copy the code

Maxwell Modifying configurations

#The configuration file was renamed[root @ Server1d220 maxwell - 1.23.5] # config. The mv properties. The example config. The properties
#Modifying a Configuration File[root @ Server1d220 maxwell - 1.23.5] # vim config. The properties#The sample id
client_id=maxwell-dev
# logconfiguration
#Debug is recommended for the first startup. You can enable mysql data and Kafka requests, and then change to INFO after it stabilizes
log_level=info
#Kafka configurationProducer = kafka kafka. The bootstrap. The servers = 192.168.1.220:9092#Mysql configurationThe host = 192.168.110.232 user = password = maxwell @ 2020 maxwell#Parse rule configuration (example: match only all tables under the distortionMediation library)
filter= exclude: contradictionmediation.*, include: contradictionmediation.*
Copy the code

Maxwell’s start

#Direct start[root@Server1d220 maxwell-1.23.5]# nohup./bin/maxwell >logs/maxwell.log 2> &1&
#Boot with parameters[root@Server1d220 maxwell-1.23.5]# nohub./bin/maxwell --user='maxwell' --password='Maxwell@2020' = '192.168.110.232' - host - producer =. Kafka, kafka bootstrap. The servers = 192.168.1.220:9092 - kafka_topic = maxwell >logs/maxwell.log 2>&1 &
#Finally the JPS - did l have a look at the process, and then look at the logs, success will print: BinlogConnectorLifecycleListener - Binlog connected.


#Parsing rules
#Matches only the TBL tables of the fooDB database and all tables with table_ numbers
--filter='exclude: foodb.*, include: foodb.tbl, include: foodb./table_\d+/'
#Exclude all libraries all tables, only match db1 database
--filter = 'exclude: *.*, include: db1.*'
#Reject all updates with db.tbl.col value reject
--filter = 'exclude: db.tbl.col = reject'
#Exclude any updates that contain col_A columns
--filter = 'exclude: *.*.col_a = *'
#The BAD_DB database is completely excluded from the blacklist. To restore the blacklist, you must delete the Maxwell database
--filter = 'blacklist: bad_db.*' 
Copy the code

Maxwell Indicates the parameter list

Maxwell multi-instance

Multiple instances can be deployed and each instance can be started separately

Maxwell Other commands

#Fully initializes data for a table
[root@Server1d220 maxwell-1.23.5]# ./bin/maxwell-bootstrap --user maxwell --password Maxwell@2020 --host 192.168.110.232  --database contradictionmediation --table member_info --client_id maxwell-dev

#The request data
#In the Maxwell database
delete from maxwell.`databases` where id is not null ;
delete from maxwell.`tables`where id is not null;
delete from maxwell.`schemas`where id is not null;
delete from maxwell.bootstrap where id is not null ;
delete from maxwell.columns where id is not null ;
delete from maxwell.heartbeats where true;
delete from maxwell.positions where true;

Copy the code

conclusion

In general, it is not difficult, just one step at a time. I’ll write about kafka access and the implementation of synchronizing data to ES later.

Reference documentation

  • Replication Factor LARGER than Available brokers message cannot be created after Kakfa Topic creation
  • Binlog-based MySQL data is synchronized to MRS cluster
  • Maxwell synchronizes binlogs from mysql in real time
  • MySQL Binlog parsing tool Maxwell explains