I. Experiment description

After Storm AD real-time statistics!

Experimental data are generated by Java code, and the format of each data is as follows:

Time stamp Province city User ID AD ID

The experiment mainly fulfilled three requirements, namely

(1) Real-time statistics of the number of clicks on advertisements in various provinces and cities every day, and store them in Mysql;

(2) Realize the real-time dynamic blacklist mechanism, that is, block the users who click on an advertisement more than 60 times a day (the blacklisted user ID is stored in Mysql);

(3) The total clicks of advertisements in the last one minute are calculated every 10 seconds and displayed in HTML;

Second, the experiment code

  • Data simulation generation
package cn.edu.neu.experiment;

import lombok.*;

/ * * *@author32098 * /
@Data
@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
public class AdvertiseClickBean {
    private String advertiseId;
    private Long clickTime;
    private String clickUserId;
    private String clickUserProvince;
    private String clickUserCity;
}

Copy the code
package cn.edu.neu.experiment;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.util.Arrays;
import java.util.List;
import java.util.Random;

/ * * *@author32098 * /
public class AdvertiseClickMockDataSource extends RichParallelSourceFunction<AdvertiseClickBean> {
    private boolean keepMock;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        keepMock = true;
    }

    @Override
    public void run(SourceContext<AdvertiseClickBean> sourceContext) throws Exception {
        List<String> provinceList = Arrays.asList("Jiangxi"."Liaoning"."Zhejiang"."Guangdong"."Hunan"."Hubei"."Jilin"."Heilongjiang"."Fujian");
        List<String> cityList = Arrays.asList("Nanchang"."Shenyang"."Hangzhou"."Guangzhou"."Changsha"."Wuhan"."Changchun"."Harbin"."Xiamen");

        int len = provinceList.size();
        Random r = new Random();
        while (keepMock) {
            for(int i=0; i<r.nextInt(150); i++){
                int idx = r.nextInt(len);
                String aid = "Ad_" + r.nextInt(20);
                // Simulate data delay and out of order
                Long clickTime = System.currentTimeMillis() - r.nextInt(3) *1000;
                String clickUserId = "U" + r.nextInt(10);
                String clickUserProvince = provinceList.get(idx);
                String clickUserCity = cityList.get(idx);
                sourceContext.collect(new AdvertiseClickBean(aid, clickTime, clickUserId, clickUserProvince, clickUserCity));
            }
            Thread.sleep(6000); }}@Override
    public void cancel(a) {
        keepMock = false; }}Copy the code
package cn.edu.neu.experiment;

import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;

/ * * *@author32098 * /
public class KafkaAdvertiseDataProducer {
    private static org.apache.flink.streaming.api.datastream.DataStreamSource<AdvertiseClickBean> DataStreamSource;

    public static void main(String[] args) throws Exception {
        Env: create a streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        // 2. source: Add custom source to generate AD click simulation data
        DataStreamSource<AdvertiseClickBean> advertiseClickDataStream = env.addSource(new AdvertiseClickMockDataSource());

        // 3. transformation
        SingleOutputStreamOperator<String> advertiseClickDataJsonStream = advertiseClickDataStream.map(new MapFunction<AdvertiseClickBean, String>() {
            @Override
            public String map(AdvertiseClickBean advertiseClickBean) throws Exception {
                returnJSON.toJSONString(advertiseClickBean); }});// 4. sink to kafka
        Properties props = new Properties();
        props.setProperty("bootstrap.servers"."master:9092");
        FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("flink_kafka".new SimpleStringSchema(), props);

        advertiseClickDataJsonStream.addSink(kafkaSink);

        // 5. executeenv.execute(); }}Copy the code
  • Requirement (1) realization
package cn.edu.neu.experiment.province_city_ad_click_count;

import lombok.*;

/ * * *@author32098 * /
@Data
@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
public class AdvertiseClickData {
    private String clickTime;
    private String clickUserProvince;
    private String clickUserCity;
    private String advertiseId;
    private int clickCount;
}

Copy the code
package cn.edu.neu.experiment.province_city_ad_click_count;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

/ * * *@author32098 * /
public class MysqlSink extends RichSinkFunction<AdvertiseClickData>{
    private Connection conn = null;
    private PreparedStatement ps = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        conn = DriverManager.getConnection("jdbc:mysql://master:3306/user_advertise"."root"."Hive@2020");
        String sql = "insert into province_city_advertise(day,province,city,aid,count) values (? ,? ,? ,? ,?) on duplicate key update count=?";
        ps = conn.prepareStatement(sql);
    }

    @Override
    public void invoke(AdvertiseClickData value, Context context) throws Exception {
        ps.setString(1, value.getClickTime());
        ps.setString(2, value.getClickUserProvince());
        ps.setString(3, value.getClickUserCity());
        ps.setString(4, value.getAdvertiseId());
        ps.setInt(5, value.getClickCount());
        ps.setInt(6, value.getClickCount());
        ps.executeUpdate();
    }

    @Override
    public void close(a) throws Exception {
        if(conn ! =null) {
            conn.close();
        }
        if(ps ! =null) { ps.close(); }}}Copy the code
package cn.edu.neu.experiment.province_city_ad_click_count;

import cn.edu.neu.experiment.AdvertiseClickBean;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;
import java.util.Properties;

/ * * *@author32098 * /
public class KafkaAdvertiseDataConsumerA {
    public static void main(String[] args) throws Exception {
        Properties pros = new Properties();
        pros.setProperty("bootstrap.servers"."master:9092");
        pros.setProperty("group.id"."flink");
        pros.setProperty("auto.offset.reset"."latest");
        pros.setProperty("flink.partition-discovery.interval-millis"."5000");
        pros.setProperty("enable.auto.commit"."true");
        pros.setProperty("auto.commit.interval.ms"."2000");

        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>(
                "flink_kafka".new SimpleStringSchema(),
                pros
        );
        kafkaSource.setStartFromLatest();

        Env: create a streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 2. source
        DataStreamSource<String> kafkaDataStream = env.addSource(kafkaSource);

        // 3. transformation
        // 3.1 to java object
        SingleOutputStreamOperator<AdvertiseClickBean> advertiseClickDataStream = kafkaDataStream.map(new MapFunction<String, AdvertiseClickBean>() {
            @Override
            public AdvertiseClickBean map(String s) throws Exception {
                returnJSON.parseObject(s, AdvertiseClickBean.class); }});// 3.2 Add water level
        DataStream<AdvertiseClickBean> adClickDataStream  =  advertiseClickDataStream.assignTimestampsAndWatermarks(
                WatermarkStrategy.<AdvertiseClickBean>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((adClickData, timestamp) -> adClickData.getClickTime())
        );

        // 3.3 MAP: Processing time Process the time and select the required data
        SingleOutputStreamOperator<AdvertiseClickData> dealtAdClickDs = adClickDataStream.map(new MapFunction<AdvertiseClickBean, AdvertiseClickData>() {
            @Override
            public AdvertiseClickData map(AdvertiseClickBean advertiseClickBean) throws Exception {
                String ymd = new SimpleDateFormat("yyyy-MM-dd").format(new Date(advertiseClickBean.getClickTime()));
                return new AdvertiseClickData(ymd, advertiseClickBean.getClickUserProvince(), advertiseClickBean.getClickUserCity(), advertiseClickBean.getAdvertiseId(),1); }});3.4 Creating a View
        tEnv.createTemporaryView("advertise_click_data",
                dealtAdClickDs,
                $("clickTime") and $("clickUserProvince") and $("clickUserCity") and $("advertiseId") and $("clickCount"));// 3.5 Group Aggregation
        Table resultTable = tEnv.sqlQuery(
                "SELECT clickTime, clickUserProvince, clickUserCity, advertiseId, SUM(clickCount) as clickCount FROM advertise_click_data GROUP BY clickTime, clickUserProvince, clickUserCity, advertiseId"
        );

        / / 3.6
        DataStream<Tuple2<Boolean, AdvertiseClickData>> toConsoleDs = tEnv.toRetractStream(resultTable, AdvertiseClickData.class);
        DataStream<AdvertiseClickData> resultDs = tEnv.toRetractStream(resultTable, AdvertiseClickData.class).filter(record->record.f0).map(record->record.f1);

        // 4. sink
        resultDs.addSink(new MysqlSink());
        toConsoleDs.print();

        // 5. executeenv.execute(); }}Copy the code
  • Requirement (2) realization
package cn.edu.neu.experiment.advertise_blacklist;

import lombok.*;

/ * * *@author32098 * /
@Data
@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
public class AdvertiseClickData {
    private String clickTime;
    private String clickUserId;
    private String advertiseId;
    private long clickCount;
}
Copy the code
package cn.edu.neu.experiment.advertise_blacklist;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

/ * * *@author32098 * /
public class MysqlSink extends RichSinkFunction<AdvertiseClickData> {
    private Connection conn = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        conn = DriverManager.getConnection("jdbc:mysql://master:3306/user_advertise"."root"."Hive@2020");
    }

    @Override
    public void invoke(AdvertiseClickData value, Context context) throws Exception {
        PreparedStatement ps = conn.prepareStatement("select uid from black_list where uid=?");
        ps.setString(1, value.getClickUserId());
        ResultSet rs = ps.executeQuery();
        if(! rs.next()){ String day = value.getClickTime(); ps = conn.prepareStatement("select * from user_advertise where day=? and uid=? and aid=?"
            );
            ps.setString(1, day);
            ps.setString(2, value.getClickUserId());
            ps.setString(3, value.getAdvertiseId());
            rs = ps.executeQuery();
            if(rs.next()){
                PreparedStatement psA = conn.prepareStatement(
                        "update user_advertise set count = ? where day=? and uid=? and aid=?"
                );
                psA.setLong(1, value.getClickCount());
                psA.setString(2, day);
                psA.setString(3, value.getClickUserId());
                psA.setString(4, value.getAdvertiseId());
                psA.executeUpdate();
                psA.close();
            }else{
                PreparedStatement psB = conn.prepareStatement("insert into user_advertise(day,uid,aid,count) values (? ,? ,? ,?) ");
                psB.setString(1, day);
                psB.setString(2, value.getClickUserId());
                psB.setString(3, value.getAdvertiseId());
                psB.setLong(4, value.getClickCount());
                psB.executeUpdate();
                psB.close();
            }
            ps = conn.prepareStatement(
                    "select * from user_advertise where day=? and uid=? and aid=? and count>60"
            );
            ps.setString(1, day);
            ps.setString(2, value.getClickUserId());
            ps.setString(3, value.getAdvertiseId());
            rs = ps.executeQuery();
            if(rs.next()){
                PreparedStatement psC = conn.prepareStatement("insert into black_list(uid) value(?) on duplicate key update uid=?");
                psC.setString(1, value.getClickUserId());
                psC.setString(2, value.getClickUserId()); psC.executeUpdate(); psC.close(); } ps.close(); }}@Override
    public void close(a) throws Exception {
        if(conn ! =null) { conn.close(); }}}Copy the code
package cn.edu.neu.experiment.advertise_blacklist;

import cn.edu.neu.experiment.AdvertiseClickBean;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;
import java.util.Properties;

import static org.apache.flink.table.api.Expressions.$;

/ * * *@author32098 * /
public class KafkaAdvertiseDataConsumerB {
    public static void main(String[] args) throws Exception {
        Properties pros = new Properties();
        pros.setProperty("bootstrap.servers"."master:9092");
        pros.setProperty("group.id"."flink");
        pros.setProperty("auto.offset.reset"."latest");
        pros.setProperty("flink.partition-discovery.interval-millis"."5000");
        pros.setProperty("enable.auto.commit"."true");
        pros.setProperty("auto.commit.interval.ms"."2000");

        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>(
                "flink_kafka".new SimpleStringSchema(),
                pros
        );
        kafkaSource.setStartFromLatest();

        Env: create a streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 2. source
        DataStreamSource<String> kafkaDataStream = env.addSource(kafkaSource);

        // 3. transformation
        // 3.1 to java object
        SingleOutputStreamOperator<AdvertiseClickBean> advertiseClickDataStream = kafkaDataStream.map(new MapFunction<String, AdvertiseClickBean>() {
            @Override
            public AdvertiseClickBean map(String s) throws Exception {
                returnJSON.parseObject(s, AdvertiseClickBean.class); }});// 3.2 Add water level
        DataStream<AdvertiseClickBean> adClickDataStream  =  advertiseClickDataStream.assignTimestampsAndWatermarks(
                WatermarkStrategy.<AdvertiseClickBean>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((adClickData, timestamp) -> adClickData.getClickTime())
        );

        // 3.3 map: process time and select required data
        SingleOutputStreamOperator<AdvertiseClickData> dealtAdClickDs = adClickDataStream.map(new MapFunction<AdvertiseClickBean, AdvertiseClickData>() {
            @Override
            public AdvertiseClickData map(AdvertiseClickBean advertiseClickBean) throws Exception {
                String ymd = new SimpleDateFormat("yyyy-MM-dd").format(new Date(advertiseClickBean.getClickTime()));
                return new AdvertiseClickData(ymd, advertiseClickBean.getClickUserId(), advertiseClickBean.getAdvertiseId(), 1); }});3.4 Creating a View
        tEnv.createTemporaryView("advertise_click_data",
                dealtAdClickDs,
                $("clickTime") and $("clickUserId") and $("advertiseId") and $("clickCount"));// 3.5 Group Aggregation
        Table resultTable = tEnv.sqlQuery(
                "SELECT clickTime, clickUserId, advertiseId, SUM(clickCount) as clickCount FROM advertise_click_data GROUP BY clickTime, clickUserId, advertiseId"
        );

        / / 3.6
        DataStream<AdvertiseClickData> resultDs = tEnv.toRetractStream(resultTable, AdvertiseClickData.class).filter(record->record.f0).map(record->record.f1);

        // 4. sink
        resultDs.addSink(new MysqlSink());

        // 5. executeenv.execute(); }}Copy the code
  • Requirement (3) realization
package cn.edu.neu.experiment.advertise_click_count_nearly_minute;

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;

/ * * *@author32098 * /
public class ClickTimeAggregate implements AggregateFunction<Tuple2<String.Long>, Long.Long> {
    /** * create accumulator *@returnReturns the accumulator's initial value 0 */
    @Override
    public Long createAccumulator(a) {
        return 0L;
    }

    /** * Add * to the accumulator@paramIn the input *@paramAcc current accumulator value *@returnUpdated accumulator value */
    @Override
    public Long add(Tuple2<String, Long> in, Long acc) {
        return in.f1 + acc;
    }

    /** * gets the final value of the accumulator *@paramFinal value of acc accumulator *@returnThe final value of the accumulator */
    @Override
    public Long getResult(Long acc) {
        return acc;
    }

    /** * Merge the results of each subTask */
    @Override
    public Long merge(Long accA, Long accB) {
        returnaccA + accB; }}Copy the code
package cn.edu.neu.experiment.advertise_click_count_nearly_minute;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * Long: click time
 * Tuple2<String, Long>: Tuple2.of(advertiseId, click time)
 * String: key => advertiseId
 * @author32098 * /
public class AggregateDataCollect implements WindowFunction<Long.Tuple2<String.Long>, String.TimeWindow> {

    / * * * *@param s key => advertiseId
     * @param timeWindow timeWindow
     * @param input click time
     * @param collector collector
     * @throws Exception Exception
     */
    @Override
    public void apply(String s, TimeWindow timeWindow, Iterable<Long> input, Collector<Tuple2<String, Long>> collector) throws Exception {
        longclickTime = input.iterator().next(); collector.collect(Tuple2.of(s, clickTime)); }}Copy the code
package cn.edu.neu.experiment.advertise_click_count_nearly_minute;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Map;
import java.util.TreeMap;

/ * * *@author32098 * /
public class WindowDataProcess extends ProcessWindowFunction<Tuple2<String.Long>, Tuple2<String.Long>, String.TimeWindow> {

    @Override
    public void process(String s, Context context, Iterable<Tuple2<String, Long>> inputs, Collector<Tuple2<String, Long>> collector) throws Exception {
        Map<String, Long> adAndClickTime = new TreeMap<>();

        for (Tuple2<String, Long> input : inputs) {
            String key = input.f0;
            if(adAndClickTime.containsKey(key)){
                adAndClickTime.put(key, adAndClickTime.get(key)+input.f1);
            } else{
                adAndClickTime.put(key, input.f1);
            }
        }

        adAndClickTime.forEach(
                (xtime, yclick) -> {
                    String jsonElem = "{\"xtime\":\""+xtime+"\",\"yclick\":\""+yclick+"\}," "; System.out.println(jsonElem); }); }}Copy the code
package cn.edu.neu.experiment.advertise_click_count_nearly_minute;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.text.SimpleDateFormat;
import java.util.*;

/ * * *@author32098 * /
public class JsonSink extends RichSinkFunction<Tuple2<String.Long>> {
    private TreeMap<String, Long> timeClick = null;
    private long lastInvokeTime = 0;
    private SimpleDateFormat dateFormat = null;


    @Override
    public void open(Configuration parameters) throws Exception {
        timeClick = new TreeMap<String, Long>();
        dateFormat = new SimpleDateFormat("ss");
        lastInvokeTime = System.currentTimeMillis();
    }

    @Override
    public void invoke(Tuple2<String, Long> value, Context context) throws Exception {
        long invokeTime = System.currentTimeMillis();
        if(Integer.parseInt(dateFormat.format(invokeTime)) - Integer.parseInt(dateFormat.format(lastInvokeTime))>1){
            writeToJson();
        }
        timeClick.put(value.f0, value.f1);
        lastInvokeTime = System.currentTimeMillis();
// if(timeClick.containsKey(value.f0)){
// return;
/ /}
// if(timeClick.size() == 6){
// timeClick.pollFirstEntry();
/ /}
// timeClick.put(value.f0, value.f1);
// writeToJson();
    }

    @Override
    public void close(a) throws Exception {
        // adAndClickTime.clear();
    }

    public void writeToJson(a){
        String projectRoot = System.getProperty("user.dir");
        String file = projectRoot + "/src/main/java/cn/edu/neu/experiment/advertise_click_count_nearly_minute/advertise_click_count_nearly_minute.json";
        try {
            PrintWriter out = new PrintWriter(new FileWriter(new File(file), false));
            StringBuffer jsonStr = new StringBuffer("[");
            // System.out.println(timeClick.size());
            timeClick.forEach(
                    (time, click) -> {
                        String json = "{\"xtime\":\""+time+"\",\"yclick\":\""+click+"\}," ";
                        jsonStr.append(json);
                        // System.out.println(json);}); jsonStr.deleteCharAt(jsonStr.length()-1);
            jsonStr.append("]");
            out.println(jsonStr.toString());
            out.flush();
            out.close();
            timeClick.clear();
        } catch(IOException e) { e.printStackTrace(); }}}Copy the code
package cn.edu.neu.experiment.advertise_click_count_nearly_minute;

import cn.edu.neu.experiment.AdvertiseClickBean;
import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;
import java.util.Properties;

import static org.apache.flink.table.api.Expressions.$;

/ * * *@author32098 * /
public class KafkaAdvertiseDataConsumerC {
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class TimeClickData{
        private Long clickTime;
        private String dealtTime;
        private Long click;
    }

    public static void main(String[] args) throws Exception {
        Properties pros = new Properties();
        pros.setProperty("bootstrap.servers"."master:9092");
        pros.setProperty("group.id"."flink");
        pros.setProperty("auto.offset.reset"."latest");
        pros.setProperty("flink.partition-discovery.interval-millis"."5000");
        pros.setProperty("enable.auto.commit"."true");
        pros.setProperty("auto.commit.interval.ms"."2000");

        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>(
                "flink_kafka".new SimpleStringSchema(),
                pros
        );
        kafkaSource.setStartFromLatest();

        Env: create a streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 2. source
        DataStreamSource<String> kafkaDataStream = env.addSource(kafkaSource);

        // 3. transformation
        // 3.1 to java object
        SingleOutputStreamOperator<AdvertiseClickBean> advertiseClickDataStream = kafkaDataStream.map(new MapFunction<String, AdvertiseClickBean>() {
            @Override
            public AdvertiseClickBean map(String s) throws Exception {
                returnJSON.parseObject(s, AdvertiseClickBean.class); }});// 3.2 Add water level
        DataStream<AdvertiseClickBean> adClickDataStream  =  advertiseClickDataStream.assignTimestampsAndWatermarks(
                WatermarkStrategy.<AdvertiseClickBean>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((adClickData, timestamp) -> adClickData.getClickTime())
        );

        // 3.3 Event processing time: The event processing is as follows
        /* 9s(1-10) => 10s 13s(11-20) => 20s 24s(21-30) => 30s 32s(31-40) => 40s 48s(41-50) => 50s 56s(51-60) => 60s(0) (s / 10 (Divisible) +1) *10: (56/10+1)=60 */
        KeyedStream<Tuple2<String, Long>, String> adClickTimeKeyedDs = adClickDataStream.map(new MapFunction<AdvertiseClickBean, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(AdvertiseClickBean advertiseClickBean) throws Exception {
                long ts = advertiseClickBean.getClickTime();
                String time = new SimpleDateFormat("HH:mm:ss").format(new Date(ts));
                String[] hms = time.split(":");
                int s = (Integer.parseInt(hms[2) /10+1) *10;
                int m = Integer.parseInt(hms[1]);
                int h = Integer.parseInt(hms[0]);
                if(s == 60){
                    m = m + 1;
                    s = 0;
                    if(m == 60){
                        h = h + 1;
                        if(h == 24){
                            h = 0;
                        }
                    }
                }
                String hStr, mStr, sStr;
                if(h < 10){
                    hStr = "0" + h;
                }else{
                    hStr = String.valueOf(h);
                }
                if(m < 10){
                    mStr = "0" + m;
                }else{
                    mStr = String.valueOf(m);
                }
                if(s == 0){
                    sStr = "00";
                }else{
                    sStr = String.valueOf(s);
                }
                String hmsNew = hStr+":"+mStr+":"+sStr;
                return Tuple2.of(hmsNew, 1L);
            }
        }).keyBy(e -> e.f0);

        SingleOutputStreamOperator<Tuple2<String, Long>> resultA = adClickTimeKeyedDs.window(SlidingEventTimeWindows.of(Time.seconds(60), Time.seconds(10))).sum(1);
        SingleOutputStreamOperator<Tuple2<String, Long>> resultB = adClickTimeKeyedDs.window(SlidingEventTimeWindows.of(Time.seconds(60), Time.seconds(10))).reduce(new ReduceFunction<Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> reduce(Tuple2<String, Long> valueA, Tuple2<String, Long> valueB) throws Exception {
                returnTuple2.of(valueA.f0, valueA.f1+valueB.f1); }}); SingleOutputStreamOperator<Tuple2<String, Long>> resultC = adClickTimeKeyedDs.window(SlidingEventTimeWindows.of(Time.seconds(60), Time.seconds(10)))
                .aggregate(new ClickTimeAggregate(), new AggregateDataCollect());
        SingleOutputStreamOperator<Tuple2<String, Long>> resultD = adClickTimeKeyedDs.window(SlidingEventTimeWindows.of(Time.seconds(60), Time.seconds(10)))
                .process(new WindowDataProcess());

        // 4. sink
        resultC.addSink(new JsonSink());

        resultA.print();
        resultB.print();
        // resultC.print();
        resultD.print();

// // 3~ transformation
// SingleOutputStreamOperator
      
        adClickTimeDs = adClickDataStream.map(new MapFunction
       
        () {
       ,>
      
// @Override
// public TimeClickData map(AdvertiseClickBean advertiseClickBean) throws Exception {
// long ts = advertiseClickBean.getClickTime();
// String time = new SimpleDateFormat("HH:mm:ss").format(new Date(ts));
// String[] hms = time.split(":");
// int s = (Integer.parseInt(hms[2])/10+1)*10;
// int m = Integer.parseInt(hms[1]);
// int h = Integer.parseInt(hms[0]);
// if(s == 60){
// m = m + 1;
// s = 0;
// if(m == 60){
// h = h + 1;
// if(h == 24){
// h = 0;
/ /}
/ /}
/ /}
// String hStr, mStr, sStr;
// if(h < 10){
// hStr = "0" + h;
// }else{
// hStr = String.valueOf(h);
/ /}
// if(m < 10){
// mStr = "0" + m;
// }else{
// mStr = String.valueOf(m);
/ /}
// if(s == 0){
// sStr = "00";
// }else{
// sStr = String.valueOf(s);
/ /}
// String hmsNew = hStr+":"+mStr+":"+sStr;
// return new TimeClickData(ts, hmsNew, 1L);
/ /}
/ /});
//
// tEnv.createTemporaryView("t_time_click", adClickTimeDs, $("clickTime").rowtime(), $("dealtTime"), $("click"));
// Table tempTable = tEnv.sqlQuery("SELECT dealtTime, count(click) as total_click FROM t_time_click GROUP BY dealtTime, HOP(clickTime, interval '10' SECOND, interval '60' SECOND) ORDER BY dealtTime DESC LIMIT 24");
// SingleOutputStreamOperator
      
        resultStream = tEnv.toRetractStream(tempTable, Row.class).filter(f -> f.f0).map(f -> f.f1);
      
//
// // 4~ sink
// resultStream.print();

        // 5. executeenv.execute(); }}Copy the code
<! DOCTYPE html> <html lang="en">
<head>
    <meta charset="UTF-8"> < title > recently1Total clicks per minute </title> <! -- <script src="echarts.js"></script>--> <! -- <script type="text/javascript" src="Jquery - 1.9.0. Min. Js"></script>-->
    <script src="https://cdn.staticfile.org/echarts/4.3.0/echarts.min.js"></script>
    <script type="text/javascript" src="https://cdn.staticfile.org/jquery/1.10.2/jquery.min.js"></script>
</head>
<body>

<div id="display" style="height: 450px; width:800px; position: relative; left: 24%"></div>
<script>
    var myChart = echarts.init(document.getElementById("display"));
    setInterval(function () {
        $.getJSON("advertise_click_count_nearly_minute.json",function(data){
            var x = [];
            var y = [];
            $.each(data,function (i,obj) {
                x.push(obj.xtime)
                y.push(obj.yclick)
            });
            var option = {
                xAxis:{
                    type:"category",
                    data:x
                },
                yAxis:{
                    type:"value",
                },
                series: [{
                    data:y,
                    type:"line",
                    smooth:false,
                    color:"steelblue",
                },{
                    data:y,
                    type:"bar",
                    barWidth: 50,
                    color: "lightblue"}}; myChart.setOption(option) }) },5000)
</script>

</body>
</html>

Copy the code

3. Experimental results

  • Requirements (1)

  • (2)

  • (3)

[video (video – 8 oyyuuov – 1617857049213) (type – bilibili) (url-player.bilibili.com/player.html…)”