Abstract

The project needs to use Kafka Stream to load the data in the MySQL database, and then do an ETL-like data filtering function. This filters and de-replicates the Kafka data imported into a topic with the data in the database connected to MySQL via Kafka Connect.

content

A. Kafka installation

  • The Kafka connector feature was introduced in Kafka 1.0 and later. We first need to check to see if the corresponding version supports CONNECT (the intuitive way is that the bin directory contains CONNECT and the conf directory contains CONNECT).

Bin directory:

Conf directory:

  • We use versions: Kafka_2.11-1.0.0.jar. 2.11 is Scala version and 1.0.1 is Kafka version.

Download the Kafka-Connect-JDBC plugin

Go to the web site:https://www.confluent.io/hub/…

Download;



Select the appropriate version



Unzip to get the following directory structure:



Get:

Extract the jar file from the lib and place it in the libs directory of kafka:

3. Copy the MySQL driver to Kafka libs directory

Connect -mysql-source.properties

Copy the files in the etc directory of kafka-connect-jdbc to the config directory of kafka and change it to connect-mysql-source.properties;

Copy to kafka config:

Modify the configuration according to the local data source:

# A simple example that copies all tables from a SQLite database. The first few settings are # required for all connectors: a name, the connector class to run, and the maximum number of # tasks to create: name=test-source-mysql-jdbc-autoincrement connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=10 # The remaining configs are specific to the JDBC source connector. In this example, we connect to a # SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to # detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g. # a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'. # connection. Url = JDBC: mysql: / / 192.168.101.3:3306 / databasename? User = xxx&password = XXX connection. The url = JDBC: mysql: / / 127.0.01:3306 / us_app? User =root&password=root table. Whitelist =ocm_blacklist_number #bulk There are also incrementing and imestamp modes mode=bulk #timestamp. Column. Name =time #incrementing. Column topic.prefix=connect-mysql-

Configuration instructions reference: https://www.jianshu.com/p/9b1…

Modify the config/connect-standalone.properties in the kafka directory.

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

Start Kafka Connect

bin/connect-standalone.sh config/connect-standalone.properties config/connect-mysql-source.properties

Note: Connect-standalone. sh is a single-node distributed mode, and there is also a Connect-Distributed mode. To use this mode, you need to modify the connect-distributed properties

7. Consume Kafka and check whether the import is successful

You can start a consumer and consume the connect-mysql-ocm_blacklist_number topic from the starting point. If you see the output, your connector configuration is successful.

./kafka-console-consumer.sh -- ZooKeeper 127.0.0.1:2181 -- Topic connect-mysql-ocm_blacklist_number --from-begin

Reference:

https://blog.csdn.net/u014686…