First, window concept

In most scenarios, the data flow we need to count is unbounded, so we cannot wait for the entire data flow to terminate before counting. Usually, we only need to perform statistical analysis on a certain time range or quantity range of data: for example, every five minutes count the clicks on all items in the past hour; Or for every 1,000 clicks, count the percentage of clicks per item. In Flink, we use Windows for this kind of functionality. According to different statistical dimensions, Windows in Flink can be divided into Time Windows and Count Windows.

Second, the Time Windows

Time Windows is used for data aggregation based on Time, which can be divided into the following four categories:

2.1 Tumbling Windows

Tumbling Windows refers to Windows that do not overlap with each other. For example, if the click volume of goods in the past hour is counted every hour, a day can only be divided into 24 Windows, and each window does not overlap with each other, as follows:

Here we take word frequency statistics as an example to give a specific use case, the code is as follows:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Receives data input on the socket
DataStreamSource<String> streamSource = env.socketTextStream("hadoop001".9999."\n".3);
streamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
    @Override
    public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
        String[] words = value.split("\t");
        for (String word : words) {
            out.collect(new Tuple2<>(word, 1L));
        }
    }
}).keyBy(0).timeWindow(Time.seconds(3)).sum(1).print(); // Count the number of occurrences of each word every 3 seconds
env.execute("Flink Streaming");
Copy the code

The test results are as follows:

2.2 Sliding Windows

A sliding window is used to scroll for aggregation analysis. For example, if the click volume of all commodities in the past hour is counted every 6 minutes, then the statistical Windows overlap with each other, that is, they can be divided into 240 Windows in a day. The illustration is as follows:

It can be seen that the four Windows 1-4 have equal time overlap with each other. To implement a sliding window, simply pass a second parameter as the scroll time when using the timeWindow method, as follows:

// Collect data in the past 1 minute every 3 seconds
timeWindow(Time.minutes(1),Time.seconds(3))
Copy the code

2.3 the Session Windows

When the user is browsing continuously, there may be click data every moment. For example, in the active period, the user may frequently add and remove certain items to the cart, and you only want to know the final cart status of the user during this browsing. In this case, you can conduct statistics after the end of the session held by the user. To implement such statistics, you can use Session Windows.

The specific implementation code is as follows:

If no data is entered within 10 seconds, the session is considered closed and statistics are triggered
window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
// Use event time as a metric
window(EventTimeSessionWindows.withGap(Time.seconds(10)))
Copy the code

2.4 Global Windows

The last window is the global window, which allocates all elements with the same key to the same window. This window is usually used in conjunction with triggers. If there is no corresponding trigger, the calculation will not be performed.

Here we continue to take the above word frequency statistics case as an example, and the sample code is as follows:

// When the total number of occurrences of the word reaches 10 times, the calculation is triggered to calculate the total number of occurrences of the word in the whole window
window(GlobalWindows.create()).trigger(CountTrigger.of(10)).sum(1).print();
Copy the code

Third, the Count of Windows

Count Windows is used for data aggregation in the dimension of quantity, which is also divided into scroll window and sliding window. The implementation method is exactly the same as time window, but the API is different, as follows:

// Scroll the counting window, counting every 1000 clicks
countWindow(1000)
// Slide the counting window to calculate the past 1000 clicks after every 10 clicks
countWindow(1000.10)
Copy the code

In fact, the internal count window is to call the global window we introduced in the last part to achieve, its source code is as follows:

public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
    return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}


public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
    return window(GlobalWindows.create())
        .evictor(CountEvictor.of(size))
        .trigger(CountTrigger.of(slide));
}
Copy the code

The resources

Flink Windows: ci.apache.org/projects/fl…