When you open the time window, Flink calculates which window it should go into for each element in the data stream, as follows
The source code
org.apache.flink.streaming.api.windowing.windows.TimeWindow.getWindowStartWithOffset()
/**
* Method to get the window start for a timestamp.
*
* @param timestamp epoch millisecond to get the window start.
* @param offset The offset which window start would be shifted by.
* @param windowSize The size of the generated windows.
* @return window start
*/
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
Copy the code
instructions
- Timestamp indicates the timestamp, in milliseconds
- Offset Remoteness, 0 by default, in milliseconds
- WindowSize windowSize, in milliseconds
- WindowStartWithOffset Window start time, in milliseconds
WindowStartWithOffset = timestamp - (timestamp - offset + windowSize) % windowSize
= timestamp - (timestamp - 0 + windowSize) % windowSize
= timestamp - (timestamp + windowSize) % windowSize
= timestamp - (timestamp % windowSize + windowSize % windowSize)
= timestamp - timestamp % windowSize
Copy the code
For example,
I want to calculate the online status of all devices of a certain type within 1 minute. The main idea is as follows: First, open an event time window of 1 minute based on the device type keyby()
If the heartbeat occurs at the following time, according to the algorithm, the corresponding time window of each heartbeat is as follows
Heartbeat time | Window start time |
---|---|
The 09:53:08 2020-10-09, 655 | The 09:53:00 2020-10-09, 000 |
The 09:53:11 2020-10-09, 655 | The 09:52:00 2020-10-09, 000 |
The 09:53:31 2020-10-09, 655 | The 09:53:00 2020-10-09, 000 |
The 09:53:47 2020-10-09, 655 | The 09:52:00 2020-10-09, 000 |
The 09:53:59 2020-10-09, 655 | The 09:53:00 2020-10-09, 000 |
The 09:54:00 2020-10-09, 655 | The 09:54:00 2020-10-09, 000 |
The 09:54:01 2020-10-09, 655 | The 09:54:00 2020-10-09, 000 |
The 09:54:07 2020-10-09, 655 | The 09:54:00 2020-10-09, 000 |
If the current time window is changed to 15 seconds, the corresponding window start time will also change, as follows
Heartbeat time | Window start time |
---|---|
The 09:53:08 2020-10-09, 655 | The 09:53:00 2020-10-09, 000 |
The 09:53:11 2020-10-09, 655 | The 09:52:00 2020-10-09, 000 |
The 09:53:31 2020-10-09, 655 | The 09:53:30 2020-10-09, 000 |
The 09:53:47 2020-10-09, 655 | The 09:52:45 2020-10-09, 000 |
The 09:53:59 2020-10-09, 655 | The 09:53:45 2020-10-09, 000 |
The 09:54:00 2020-10-09, 655 | The 09:54:00 2020-10-09, 000 |
The 09:54:01 2020-10-09, 655 | The 09:54:00 2020-10-09, 000 |
The 09:54:07 2020-10-09, 655 | The 09:54:00 2020-10-09, 000 |
reference
For more information about flink window alignment, see the insert section “Time Windows are Aligned to the Epoch”
The official instructions