Objective: To monitor a record table of Oracle, obtain the table data if there is new data, and push it to wechat enterprises.

Process: Kafka monitors the Oracle specified table in real time, obtains the operation information of the table (log), consumes Kafka using Spark Structured Streaming, obtains the data, cleans it and stores it in the specified directory. Python monitors the directory in real time, extracts the data from the text and pushes it to wechat. (Oracle on one server, Kafka and Spark on another server)

Structure: Oracle+Kafka+Spark Structured Streaming+Python

Centos7 Oracle 11g apache-maven-3.6.3-bin.tar.gz kafka-connect-oracle-master.zip hadoop-2.1.1.tar. gz Kafka_2.11-2.4.1. TGZ (The Scala version must be the same as the jar package connected to the system and Spark. TGZ spark- Streaming -kafka-0-8_2.11-2.4.0.jar Java 1.8 Python 3.6

A, Oracle, side

The setting here is relatively simple, use SYS or SYSTEM account to open archived log and additional log, generally for the sake of data security in practical work, log will be open state, so there is no need to say more, if there is a problem of building and opening can be private message at any time.

Second, Kafka side

① Configure Maven and add environment variables

# Download address: http://maven.apache.org/download.cgi # extract all the default configuration file in/usr/local path tar XVF apache maven - 3.6.3 - bin. Tar. Gz - / usr/local/C Export MAVEN_HOME=/usr/local/apache-maven export PATH=$PATH:${MAVEN_HOME}/bin source /ect/profileCopy the code

Kafka-connect-oracle-master kafka-connect-oracle-master kafka-connect-oracle-master kafka-connect-oracle-master kafka-connect-oracle-master

Zip package download address: https://github.com/erdemcer/kafka-connect-oracle # extract unzip kafka - connect - oracle - master. Zip # modified configuration file under the config vi Kafka - connect - oracle - master/config/OracleSourceConnector properties # modify content as follows: the name, alias = dbserver # oracle instance name: Select instance_name from v$instance tasks. Max =1 topic=cdczztar #kafka db.name=DBSERVER #oracle server: select name from v$database; Db. hostname=192.168.81.159 #oracle server address db.port=1521 # Oracle port, Db.fetch. Size =1 table.whitelist= linhl.lhl_test Blacklist = # The name of the unmonitored table. Blacklist is not empty. Parse.dtml. Data =true reset.offset=true start. SCN = multitenant=false parse.dtml. Data =true reset. Create target directory CD /usr/local/kafka-connect-oracle-master MVN clean packageCopy the code

3. Unpack kafka and put the jar packages and configuration files in the master folder

# unzip download address: http://kafka.apache.org/downloads tar XVF kafka_2. 11 - against 2.4.1. TGZ - C # / usr/local/renamed mv. / kafka_2. 11 - against 2.4.1. # / kafka replication configuration file Cp/usr/local/kafka - connect - oracle - master/target/kafka - connect - oracle - 1.0.71. Jar/usr/local/kafka/libs/cp /usr/local/kafka-connect-oracle-master/lib/ojdbc7.jar /usr/local/kafka/libs/ cp /usr/local/kafka-connect-oracle-master/config/OracleSourceConnector.properties /usr/local/kafka/config/Copy the code

(4) open Kafka

Zookeeper. / zookeepere-server-start. sh: /usr/local/ Kafka /bin/ / config/zookeeper. Properties # start kafka service. / kafka - server - start. Sh.. # / config/server. The properties set up topic - cdczztar. / kafka - switchable viewer. Sh -- create -- zookeeper localhost: 2181 - replication - factor 1 -- Partitions 1 --topic cdczztar # Query all topics. /kafka-topics. Sh --zookeeper localhost:2181 --list # Initiate oracle connections ./connect-standalone.sh .. /config/connect-standalone.properties .. / config/OracleSourceConnector. Properties # # start end consumers here just for display, /kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic cdczztarCopy the code

Third, Spark side

Structured Streaming need to enable HDFS, here are realized in the local test environment, so on Java and hadoop installation, you can refer to this pseudo-distributed configuration dblab.xmu.edu.cn/blog/install-hadoop (1) configuration

You can download it from the official website. No resources please send a private message tar -zxf spark-2.4.0-bin-without-hadoop. TGZ -c /usr/local/ # rename mv./spark-2.4.0-bin- without-hadoop. /spark CD /usr/local/spark cp./conf/spark- env.sh.template. /conf/spark-env.sh vi./conf/spark-env.sh SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop The classpath) : / usr/local/spark/examples/jars / * : / usr/local/spark/jars/kafka / * : / usr/local/kafka/libs / * # modified vi system environment variables Export HADOOP_HOME=/usr/local/hadoop export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native Export JAVA_HOME=/opt/ Java /jdk1.8.0_261 export JRE_HOME=${JAVA_HOME}/ JRE export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib export PATH=$PATH:${JAVA_HOME}/bin:/usr/local/hbase/bin export SPARK_HOME=/usr/local/spark export PYTHONPATH = $SPARK_HOME/python: $SPARK_HOME/python/lib/py4j - 0.10.7 - SRC. Zip: / usr/local/python3 / lib/python3.6 / site - packages / :$PYTHONPATH export PYSPARK_PYTHON=python3 export PATH=$HADOOP_HOME/bin:$SPARK_HOME/bin:$PATH # Update configuration source /etc/profile Create kafka folder in jars Put kafka all jars in this directory cp/usr/local/spark - streaming - kafka - 0-8 _2. 11-2.4.0. Jar/usr/local/spark/jars/kafka cp /usr/local/kafka/libs/* /usr/local/spark/jars/kafkaCopy the code

② Construction of Structured Streaming script

#! /usr/bin/env python3 import re from functools import partial from pyspark.sql.functions import * from pyspark.sql import  SparkSession if __name__ == "__main__": spark = SparkSession \ .builder \ .appName("StructuredKafkaWordCount") \ .getOrCreate() Spark. SparkContext. SetLogLevel (" WARN ") lines = spark \ # # only reminder information using spark streaming is based on use createDirectStream KakfkaUtils package .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", 'cdczztar') \ # Topic.load ().selectExpr("CAST(value AS STRING)") # lines.printschema () # Kafka retrieves the oracle log. Pattern = 'data ':(.+)}' fields = partial(regexp_extract, STR ="value", Select (fields(idx=1).alias("values")) # WriteStream \.outputmode ("append") \.format(" CSV ") \.option("path","file:///tmp/filesink") \ And # Save to server address. Option ("checkpointLocation","file:///) \.trigger(processingTime="10 seconds") \.start() Query. AwaitTermination () # open a new server window, Side has in code directory/usr/local/spark/bin/spark - submit - packages org. Apache. Spark: spark - SQL - kafka - 0-10 _2. 11:2. 4.0 the spark. PyCopy the code

③ Run Python to open the written file in real time, extract the information and push it to wechat

Import CSV import pyinotify # this package only supports Linux. Import time import requests import json import datetime import pandas as pd CORPID = "******" # iD SECRET = "*******" # enterprise wechat key AGENTID = 1000041 # enterprise wechat port multi_event = pyinotify.IN_CREATE # Monitor only the create action wm = Pyinotify.WatchManager() # override class MyHandler(Pyinotify.ProcessEvent) for process_IN_CREATE: def send_msg_to_wechat(self, content): record = '{}\n'.format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')) s = requests.session() url1 = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={0}&corpsecret={1}".format(CORPID, SECRET) rep = s.get(url1) record += "{}\n".format(json.loads(rep.content)) if rep.status_code == 200: Token = json. content (rep.content)['access_token'] record += "token successfully \n" else: Record += "failed to obtain token \n" token = None url2 = "https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={}".format(token) header = { "Content-Type": "application/json" } form_data = { "touser": "@all", "toparty": " PartyID1 | PartyID2 ", "totag": " TagID1 | TagID2 ", "msgtype": "text", "agentid": AGENTID, "text": { "content": content }, "safe": 0 } rep = s.post(url2, data=json.dumps(form_data).encode('utf-8'), headers=header) if rep.status_code == 200: Res = json.loads(rep.content) Record += "load successful \n" else: Record += "record failed \n" res = None return res def process_IN_CREATE(self, event): try: if '_spark_metadata' in event.pathname or '.crc' in event.pathname: pass else: Print (event. pathName) f_path = event.pathname # Sleep (5) df = pd.read_csv(r "' + f_path, encoding='utf8', names=['value'], encoding='utf8', sep='/') send_str = df.iloc[0, 0].replace('\\', '').replace(',"before":null}', Replace ('"',') print(send_str) self. send_msg_to_str (' end_msg_to_str ') except: pass handler = MyHandler() notifier = pyinotify.Notifier(wm,handler) wm.add_watch('/tmp/filesink/',multi_event) notifier.loop()Copy the code

The wechat message is as follows:

4. Problem points

There are the following questions have not been realized, there are ideas, please feel free to comment on private communication, thank you

  • After the Structured Streaming consumes Kafka information, can the message be directly pushed to the wechat port?
  • Python monitor files have new file paths that can be retrieved immediately, but the contents need to wait for data to be written. The mode of sleep is unstable. Is there a way to read the file when data is written?

Author: Rango_lhl

Link: www.cnblogs.com/rango-lhl/p…