When you open the time window, Flink calculates which window it should go into for each element in the data stream, as follows

The source code

org.apache.flink.streaming.api.windowing.windows.TimeWindow.getWindowStartWithOffset()

/**
 * Method to get the window start for a timestamp.
 *
 * @param timestamp epoch millisecond to get the window start.
 * @param offset The offset which window start would be shifted by.
 * @param windowSize The size of the generated windows.
 * @return window start
 */
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
    return timestamp - (timestamp - offset + windowSize) % windowSize;
}
Copy the code

instructions

  • Timestamp indicates the timestamp, in milliseconds
  • Offset Remoteness, 0 by default, in milliseconds
  • WindowSize windowSize, in milliseconds
  • WindowStartWithOffset Window start time, in milliseconds
WindowStartWithOffset = timestamp - (timestamp - offset + windowSize) % windowSize
                      = timestamp - (timestamp - 0 + windowSize) % windowSize
                      = timestamp - (timestamp + windowSize) % windowSize
                      = timestamp - (timestamp % windowSize + windowSize % windowSize)
                      = timestamp - timestamp % windowSize
Copy the code

For example,

I want to calculate the online status of all devices of a certain type within 1 minute. The main idea is as follows: First, open an event time window of 1 minute based on the device type keyby()

If the heartbeat occurs at the following time, according to the algorithm, the corresponding time window of each heartbeat is as follows

Heartbeat time Window start time
The 09:53:08 2020-10-09, 655 The 09:53:00 2020-10-09, 000
The 09:53:11 2020-10-09, 655 The 09:52:00 2020-10-09, 000
The 09:53:31 2020-10-09, 655 The 09:53:00 2020-10-09, 000
The 09:53:47 2020-10-09, 655 The 09:52:00 2020-10-09, 000
The 09:53:59 2020-10-09, 655 The 09:53:00 2020-10-09, 000
The 09:54:00 2020-10-09, 655 The 09:54:00 2020-10-09, 000
The 09:54:01 2020-10-09, 655 The 09:54:00 2020-10-09, 000
The 09:54:07 2020-10-09, 655 The 09:54:00 2020-10-09, 000

If the current time window is changed to 15 seconds, the corresponding window start time will also change, as follows

Heartbeat time Window start time
The 09:53:08 2020-10-09, 655 The 09:53:00 2020-10-09, 000
The 09:53:11 2020-10-09, 655 The 09:52:00 2020-10-09, 000
The 09:53:31 2020-10-09, 655 The 09:53:30 2020-10-09, 000
The 09:53:47 2020-10-09, 655 The 09:52:45 2020-10-09, 000
The 09:53:59 2020-10-09, 655 The 09:53:45 2020-10-09, 000
The 09:54:00 2020-10-09, 655 The 09:54:00 2020-10-09, 000
The 09:54:01 2020-10-09, 655 The 09:54:00 2020-10-09, 000
The 09:54:07 2020-10-09, 655 The 09:54:00 2020-10-09, 000

reference

For more information about flink window alignment, see the insert section “Time Windows are Aligned to the Epoch”

The official instructions