# Split log data to different Kafka topics
# com.elaiza.gmall.realtime.app.dwd.BaseLogApp1. Start HDFS [root@master bin]# start-all.sh
[root@master bin]# hadoop dfsadmin -safemode leave
[root@master bin]# mr-jobhistory-daemon.sh start historyserver2. Enable zk and kafka [root@master bin]# /usr/local/src/sh/zk.sh start
[root@master bin]# /usr/local/src/sh/kf.sh start3. Start a task [root@master bin]# flink run -d -c com.elaiza.gmall.realtime.app.dwd.BaseLogApp / usr/local/SRC/sh/gmall - flink/sub - jar/gmall2021 - realtime - 1.0. The jar4. Enable log emulation and log receiver [root@master bin]# java -jar /usr/local/src/sh/gmall-flink/mock-log/gmall2021-log-sink-kafka.jar
[root@master bin]# java -jar /usr/local/src/sh/gmall-flink/mock-log/gmall2021-mock-log.jar4. Or start kafka producers to produce their own data [root@master bin]# java -jar /usr/local/src/sh/gmall-flink/mock-log/gmall2021-log-sink-kafka.jar
[root@master bin]# kafka-console-producer.sh --broker-list master:9092 --topic ods_base_log

# Test data
# dwd_page_log topic
{"common": {"ar":"110000"."uid":"4"."os":"Android 11.0"."ch":"web"."is_new":"0"."md":"Xiaomi 10 Pro "."mid":"mid_18"."vc":"v2.1.134"."ba":"Xiaomi"},"page": {"page_id":"good_detail"."item":"6"."during_time": 6802,"item_type":"sku_id"."last_page_id":"home"."source_type":"query"},"displays": [{"display_type":"query"."item":"Seven"."item_type":"sku_id"."pos_id": 4."order": 1}, {"display_type":"query"."item":"6"."item_type":"sku_id"."pos_id": 4."order": 2}, {"display_type":"query"."item":"1"."item_type":"sku_id"."pos_id": 2."order": 3}, {"display_type":"query"."item":"6"."item_type":"sku_id"."pos_id": 5,"order": 4}],"actions": [{"item":"1"."action_id":"get_coupon"."item_type":"coupon_id"."ts": 1608272790401}]."ts": 1608272787000}# dwd_start_log topic
{"common": {"ar":"110000"."uid":"49"."os":"IOS 13.2.3." "."ch":"Appstore"."is_new":"0"."md":"iPhone Xs"."mid":"mid_3"."vc":"v2.1.134"."ba":"iPhone"},"start": {"entry":"icon"."open_ad_skip_ms": 3347,"open_ad_ms": 4737,"loading_time": 9640,"open_ad_id": 8},"ts": 1608272783000}# Dynamic shunt fact table to Kafka Topic and dimension table to Phoenix
# com.elaiza.gmall.realtime.app.dwd.BaseDBApp1. CREATE and configure mysql database gmALL2021_realtime CREATE TABLE 'table_process' (' source_table 'varchar(200) NOT NULL COMMENT'Source table',
`operate_type` varchar(200) NOT NULL COMMENT 'Operation type INSERT,update,delete',
`sink_type` varchar(200) DEFAULT NULL COMMENT 'Output type hbase kafka',
`sink_table` varchar(200) DEFAULT NULL COMMENT 'Output Table (Topic)',
`sink_columns` varchar(2000) DEFAULT NULL COMMENT 'Output field',
`sink_pk` varchar(200) DEFAULT NULL COMMENT 'Primary key field',
`sink_extend` varchar(200) DEFAULT NULL COMMENT 'Create table extension', PRIMARY KEY (`source_table`,`operate_type`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 2. Add gmall2021_realtime to mysql configuration file enable Binlog [root@master bin]# vim /etc/my.cnf* server-id = 1 * log-bin=mysql-bin * binlog_format=row * binlog-do-db=gmall2021 * binlog_do_db=gmall2021_realtime 3. Restarting mysql [root@master bin]# sudo systemctl restart mysqld4. Start HDFS [root@master bin]# start-all.sh
[root@master bin]# hadoop dfsadmin -safemode leave
[root@master bin]# mr-jobhistory-daemon.sh start historyserver5. Enable zk and kafka [root@master bin]# /usr/local/src/sh/zk.sh start
[root@master bin]# /usr/local/src/sh/kf.sh start6. Start hbase [root@master bin]# / usr/local/SRC/hbase - 1.3.1 / bin/start - hbase. Sh7. Enter and create Phoenix database [root@master bin]# source activate dev
[root@master bin]# source deactivate
[root@master bin]# source activate env
[root@master bin]# / usr/local/SRC/apache - phoenix - 4.14.3 - HBase - 1.3 - bin/bin/sqlline. Py master: 2181

create schema IF NOT EXISTS "GMALL2021_REALTIME"; 8. Open com. Elaiza. Gmall. Realtime. App. Ods. FlinkCDC configuration library the CDC [root @ master bin]# flink run -d -c com.elaiza.gmall.realtime.app.ods.FlinkCDC / usr/local/SRC/sh/gmall - flink/sub - jar/gmall2021 - realtime - 1.0. The jar9. Open com. Elaiza. Gmall. Realtime. App. DWD. BaseDBApp [root @ master bin]# flink run -d -c com.elaiza.gmall.realtime.app.dwd.BaseDBApp / usr/local/SRC/sh/gmall - flink/sub - jar/gmall2021 - realtime - 1.0. The jar





# kafka operation1. View the subject: [root@master bin]# kafka-topics.sh --list --zookeeper master:21812. Create subject as first, number of partitions as 2, and copy as 2 [root@master bin]# kafka-topics.sh --create --zookeeper master:2181 --topic first --partitions 1 --replication-factor 1 3. Description: [root@master bin]# kafka-topics.sh --describe --topic first --zookeeper master:2181 4. Delete the subject: [root@master bin]# kafka-topics.sh --delete --zookeeper master:2181 --topic order_info 6. Start producers (tests): [root@master bin]# kafka-console-producer.sh --topic dwd_page_log --broker-list master:90927. Enable Consumer (Test): [root@master bin]# kafka-console-consumer.sh --bootstrap-server master:9092 --topic first 8. Enable consumer (test) to get data from scratch for 7 days by default [root@master bin]# kafka-console-consumer.sh --bootstrap-server master:9092 --topic dwd_order_info --from-beginning


Save, read, delete sv and CP1. Archive [root@master bin]# flink savepoint b99afaa23462ae4732747f95bdf43b8b hdfs://master:9000/gmall-flink/sv2. Read files [root@master bin]# flink run -m master:8081 -s hdfs://master:9000/gmall-flink/sv/savepoint-b99afa-61b6e07bbfa1 -c com.elaiza.gmall.realtime.app.ods.FlinkCDC / usr/local/SRC/sh/gmall - flink/sub - jar/sh/gmall - flink/sub - jar/gmall2021 - realtime - 1.0. The jar3. Delete the SV cp [root@master bin]# hadoop fs -rm -r /gmall2021


Copy the code