1. Table API & SQL actual application

  1. Case description

    • Functional specifications

      Data source is read through socket for word statistical processing.

    • The implementation process

      • Initialize the Table operating environment

      • Conversion operation processing:

        1) Divide with Spaces

        2) Add one to each word count

        3) Group processing according to words

        4) Sum statistics

        5) Output printed data

      • Perform a task

  2. FlinkTable API implementation

    StreamTableApiApplication, code:

    // Get the running environment for the stream processing
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
    
    // Get the operating environment of the Table
    StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, environmentSettings);
    
    // Access the data source
    DataStreamSource<String> lines = env.socketTextStream("10.10.20.15".9922);
    
    // Flatten the string
    SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
        @Override
        public void flatMap(String line, Collector<String> out) throws Exception {
            Arrays.stream(line.split("")).forEach(out::collect); }});// Convert DataStream to a Table object. The default field name is F0 and the given field name is word
    Table table = tabEnv.fromDataStream(words, "word");
    
    // Group and aggregate by word
    Table resultTable = table.groupBy("word").select("word, sum(1L) as counts");
    
    // toRestractStream is the only method that can be used for stream processing, since data is continuously generated and needs to be accumulated
    // DataStream
            
              wordCountDataStream = tabEnv.toAppendStream(resultTable, WordCount.class);
            
    // wordCountDataStream.printToErr("toAppendStream>>>");
    
    DataStream<Tuple2<Boolean, WordCount>> wordCountDataStream = tabEnv.toRetractStream(resultTable, WordCount.class);
    wordCountDataStream.printToErr("toRetractStream>>>");
    
    env.execute();
    Copy the code

    Test verification:

    Enable socket input, input string:

    [root @ flink1 flink - 1.11.2]# nc -lk 9922
    Copy the code
  3. FlinkTable SQL implementation

    Code implementation:

    StreamTableSqlApplication implementation class:

    // Get the running environment for the stream processing
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
    
    // Get the operating environment of the Table
    StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, environmentSettings);
    
    // Access the data source
    DataStreamSource<String> lines = env.socketTextStream("10.10.20.15".9922);
    
    // Flatten the string
    SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
        @Override
        public void flatMap(String line, Collector<String> out) throws Exception {
            Arrays.stream(line.split("")).forEach(out::collect); }});// Convert DataStream to a Table object. The default field name is F0 and the given field name is word
    tabEnv.registerDataStream("t_wordcount", words, "word");
    
    // Group and aggregate by word
    Table resultTable = tabEnv.sqlQuery("select word,count(1) as counts from t_wordcount group by word");
    
    DataStream<Tuple2<Boolean, WordCount>> wordCountDataStream = tabEnv.toRetractStream(resultTable, WordCount.class);
    wordCountDataStream.printToErr("toRetractStream>>>");
    env.execute();
    Copy the code

2. Flink SQL scroll window actual combat

  1. Flink SQL window description

    Flink SQL supports two main types of Window aggregation: Window aggregation and Over aggregation. This section focuses on Window aggregation. Window aggregation supports two types of Time attribute definition Windows: Event Time and Processing Time. Each time attribute type supports three window types: TUMBLE, HOP, and SESSION.

  2. Case description

    Count how many users clicked on a web page in the past 1 minute. You can define a window to collect data in the last 1 minute and calculate the data in this window.

    Test data:

    The user name Access to the address Access time
    Zhang SAN taobao.com/xxx The 2021-05-10 10:00:00
    Zhang SAN taobao.com/xxx The 2021-05-10 10:00:10
    Zhang SAN taobao.com/xxx The 2021-05-10 10:00:49
    Zhang SAN taobao.com/xxx The 2021-05-10 10:01:05
    Zhang SAN taobao.com/xxx The 2021-05-10 10:01:58
    Li si taobao.com/xxx The 2021-05-10 10:02:10
  3. Scroll window application

    Tumbling Windows should be defined using the Tumble class. There are three other methods:

    • Over: defines the window length

    • On: A time field used to group (by time interval) or sort (by row number)

    • As: alias, which must appear in groupBy

    Implementation steps:

    • Initialize the flow runtime environment

    • Use blink Planner in flow mode

    • Create user click event data

    • Writes the source data to a temporary file and gets the absolute path

    • Create a table to load user click event data

    • Run an SQL query against the table and retrieve the result as a new table

    • Table is converted to DataStream

    • Perform a task

    TumbleUserClickApplication, implementation code:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();  StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, environmentSettings);// Write the source data to a temporary file and get the absolute path
    String contents =
            "Zhang SAN, http://taobao.com/xxx, the 2021-05-10 10:00:00 \ n" +
                    "Zhang SAN, http://taobao.com/xxx, the 2021-05-10 10:00:10 \ n" +
                    "Zhang SAN, http://taobao.com/xxx, the 2021-05-10 10:00:49 \ n" +
                    "Zhang SAN, http://taobao.com/xxx, the 2021-05-10 10:01:05 \ n" +
                    "Zhang SAN, http://taobao.com/xxx, the 2021-05-10 10:01:58 \ n" +
                    "Zhang SAN, http://taobao.com/xxx, the 2021-05-10 10:02:10 \ n";
    String path = FileUtil.createTempFile(contents);
    
    String ddl = "CREATE TABLE user_clicks (\n" +
            " username varchar,\n" +
            " click_url varchar,\n" +
            " ts TIMESTAMP(3),\n" +
            " WATERMARK FOR ts AS ts - INTERVAL '2' SECOND\n" +
            ") WITH (\n" +
            " 'connector.type' = 'filesystem',\n" +
            " 'connector.path' = '" + path + "',\n" +
            " 'format.type' = 'csv'\n" +
            ")";
    
    tabEnv.sqlUpdate(ddl);
    
    // Perform an SQL query on the table data and query the result as a new table
    String query = "SELECT\n" +
            " TUMBLE_START(ts, INTERVAL '1' MINUTE),\n" +
            " TUMBLE_END(ts, INTERVAL '1' MINUTE),\n" +
            " username,\n" +
            " COUNT(click_url)\n" +
            "FROM user_clicks\n" +
            "GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE), username";
    
    Table result = tabEnv.sqlQuery(query);
    
    tabEnv.toAppendStream(result, Row.class).print();
    
    env.execute();
    Copy the code

    With 1 minute as the time scroll window, the watermarking delay is 2 seconds.

    Output result:

    21-10-10T10:00,2021-10-10T10:01, Zhang SAN,3, 4> 2021-10-10T10:01,2021-10-10T10:02, Zhang SAN,2, 4> The T10:2021-10-10 02202 1-10-10 T10:03, Joe Smith, 1Copy the code

3 Flink SQL sliding window combat

  1. Implementation steps

    • Initialize the flow runtime environment

    • Use blink Planner in flow mode

    • Create user click event data

    • Writes the source data to a temporary file and gets the absolute path

    • Create a table to load user click event data

    • Run an SQL query against the table and retrieve the result as a new table

    • Table is converted to DataStream

    • Perform a task

  2. The implementation code

    Code HopUserClickApplication:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();  StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, environmentSettings);// Write the source data to a temporary file and get the absolute path
    String contents =
            "Zhang SAN, http://taobao.com/xxx, the 2020-10-10 10:00:00 \ n" +
                    "Zhang SAN, http://taobao.com/xxx, the 2020-10-10 10:00:10 \ n" +
                    "Zhang SAN, http://taobao.com/xxx, the 2020-10-10 10:00:49 \ n" +
                    "Zhang SAN, http://taobao.com/xxx, the 2020-10-10 10:01:05 \ n" +
                    "Zhang SAN, http://taobao.com/xxx, the 2020-10-10 10:01:58 \ n" +
                    "Zhang SAN, http://taobao.com/xxx, the 2020-10-10 10:02:10 \ n";
    String path = FileUtil.createTempFile(contents);
    
    String ddl = "CREATE TABLE user_clicks (\n" +
            " username varchar,\n" +
            " click_url varchar,\n" +
            " ts TIMESTAMP(3),\n" +
            " WATERMARK FOR ts AS ts - INTERVAL '2' SECOND\n" +
            ") WITH (\n" +
            " 'connector.type' = 'filesystem',\n" +
            " 'connector.path' = '" + path + "',\n" +
            " 'format.type' = 'csv'\n" +
            ")";
    
    tabEnv.sqlUpdate(ddl);
    
    // Perform a SQL query on the table data, and query the result as a new table, every 30 seconds, the last 1 minute data
    String query = "SELECT\n" +
            " HOP_START(ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE),\n" +
            " HOP_END(ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE),\n" +
            " username,\n" +
            " COUNT(click_url)\n" +
            "FROM user_clicks\n" +
            "GROUP BY HOP (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE), username";
    
    Table result = tabEnv.sqlQuery(query);
    
    tabEnv.toAppendStream(result, Row.class).print();
    
    env.execute();
    Copy the code

    The number of user clicks in the past 1 minute is counted every 30 seconds.

    Output result:

    4> 2021-05-10T9:59:30,2021-05-10T10:00:30, Zhang SAN,2 4> 2021-05-10T10:00,2021-05-10T10:01, Zhang SAN,3 4> 2021-05-10T10:00:30,2021-05-10T10:01:30, Zhang SAN,2 4> 2021-05-10T10:01,2021-05-10T10:02, Zhang SAN,2 4> 2021-05-10T10:01:30,2021-05-10T10:02:30, Zhang SAN,2 4> 2021-05-10T10:02,2021-05-10T10:03, Zhang SAN,1Copy the code

4. Flink SQL session window

  1. Implementation steps

    • Initialize the flow runtime environment

    • Use blink Planner in flow mode

    • Create user click event data

    • Writes the source data to a temporary file and gets the absolute path

    • Create a table to load user click event data

    • Run an SQL query against the table and retrieve the result as a new table

    • Table is converted to DataStream

    • Perform a task

  2. Code implementation:

    Code: SessionUserClickApplication

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();  StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, environmentSettings);// Write the source data to a temporary file and get the absolute path
    String contents =
            "Zhang SAN, http://taobao.com/xxx, the 2021-05-10 10:00:00 \ n" +
                    "Zhang SAN, http://taobao.com/xxx, the 2021-05-10 10:00:10 \ n" +
                    "Zhang SAN, http://taobao.com/xxx, the 2021-05-10 10:00:49 \ n" +
                    "Zhang SAN, http://taobao.com/xxx, the 2021-05-10 10:01:05 \ n" +
                    "Zhang SAN, http://taobao.com/xxx, the 2021-05-10 10:01:58 \ n" +
                    "Zhang SAN, http://taobao.com/xxx, the 2021-05-10 10:02:10 \ n";
    String path = FileUtil.createTempFile(contents);
    
    String ddl = "CREATE TABLE user_clicks (\n" +
            " username varchar,\n" +
            " click_url varchar,\n" +
            " ts TIMESTAMP(3),\n" +
            " WATERMARK FOR ts AS ts - INTERVAL '2' SECOND\n" +
            ") WITH (\n" +
            " 'connector.type' = 'filesystem',\n" +
            " 'connector.path' = '" + path + "',\n" +
            " 'format.type' = 'csv'\n" +
            ")";
    
    tabEnv.sqlUpdate(ddl);
    
    // Perform an SQL query on the table data and query the result as a new table, with statistics every 30 seconds
    String query = "SELECT\n" +
            " SESSION_START(ts, INTERVAL '30' SECOND),\n" +
            " SESSION_END(ts, INTERVAL '30' SECOND),\n" +
            " username,\n" +
            " COUNT(click_url)\n" +
            "FROM user_clicks\n" +
            "GROUP BY SESSION (ts, INTERVAL '30' SECOND), username";
    
    Table result = tabEnv.sqlQuery(query);
    
    tabEnv.toAppendStream(result, Row.class).print();
    
    env.execute();
    Copy the code

    User clicks are counted every 30 seconds.

    Output result:

    4> 2021-05-10T10:00,2021-05-10T10:00:40, Zhang SAN,2 4> 2021-05-10T10:00:49,2021-05-10T10:01:35, Zhang SAN,2 4> The T10:2021-05-10 01:58, 2021-05-10 T10:02:40, Joe Smith, 2Copy the code

This article was created and shared by Mirson. For further communication, please add to QQ group 19310171 or visit www.softart.cn