Follow the public account: Java big data and data warehouse, reply “information”, get big data data, learn big data technology.

window

In stream processing applications, the data is continuous, so we cannot wait for all the data to arrive before processing. Of course we can process every message that comes in, but sometimes we need to do some aggregate processing, such as how many users have clicked on our page in the last minute. In this case, we must define a window to collect data within the last minute and perform calculations on the data within that window.

Flink considers Batch to be a special case of Streaming, so the Flink underlying engine is a Streaming engine on which Streaming and Batch processing are implemented. The window is a bridge from Streaming to Batch.

  • A Window represents a collection of finite objects. A window has a maximum timestamp, which means that at some point in time — all the elements that should have entered the window — have arrived
  • A Window is a mechanism for setting up a finite set of data on an infinite stream, operating on a bounded data set. Windows can be divided into time-based Windows and count-based Windows.
  • The Flink DataStream API provides Windows for Time and Count, and adds session-based Windows. The DataStream API also provides customized Window operations for users to customize.

Window composition

Window distributor

  • AssignWindows assigns an element element with a timestamp to one or more Windows and returns a collection of Windows

  • GetDefaultTrigger returns the default trigger associated with WindowAssigner

  • GetWindowSerializer Returns the serializer for Windows assigned by WindowAssigner

  • Window allocator defines how data elements are assigned to Windows. This is done via WindowAssigner in window(…). (for keyed streams) or windowAll() (for non-keyed streams) calls that specify your choice to do so.

  • WindowAssigner is responsible for assigning each incoming data element to one or more Windows. Flink comes with a predefined window allocator for the most common use cases namely tumbling Windows, sliding Windows, session Windows, and global Windows.

  • You can also implement a custom window allocator by extending the WindowAssigner class.

  • All built-in window allocators (except for global Windows) allocate data elements to Windows based on time, which can be either processing time or event time.

State

  • State, which stores the elements within the window and, if there is an AggregateFunction, the intermediate results of incremental aggregation.

The window function

Select the appropriate calculation function, reduce the amount of development code to improve the system performance

Incremental aggregate function (window only maintains state)

  • ReduceFunction
  • AggregateFunction
  • FoldFunction

Full aggregation function (data in window maintenance window)

  • ProcessWindowFunction
    • Full quantity calculation
    • Support functions are more flexible
    • Support for state operations

The trigger

  • EventTimeTrigger trigger based onEventTime, corresponding to onEventTime

  • ProcessingTimeTrigger Trigger based on the current system time, corresponding to onProcessingTime ProcessingTime has the best performance and lowest latency. However, ProcessingTime is not deterministic in distributed computing environment, and the same data flow may be run for many times to produce different results.

  • ContinuousEventTimeTrigger

  • ContinuousProcessingTimeTrigger

  • CountTrigger

    • Trigger determines when the window function is ready to process the window (formed by the window allocator). Each has a default value. If the default triggers do not meet your needs, you can specify custom triggers. WindowAssignerTriggertrigger(…)
    • The Trigger interface has five ways to Trigger responses to different events:
      • OnElement () calls this method for each data element added to the window.
      • OnEventTime () calls this method when the registered eventtime timer is triggered.
      • OnProcessingTime () calls this method when the registered processing time timer is triggered.
      • The onMerge() method is related to state triggers and merges the state of two triggers when their corresponding Windows merge, for example when using a session window.
      • Finally, the clear() method performs whatever action is required to remove the corresponding window.
    • Default trigger
      • The default trigger GlobalWindow is never fired by NeverTrigger. Therefore, a custom trigger GlobalWindow must be defined when used.
      • By using trigger() you specify that the trigger will override a’s default trigger, WindowAssigner. For example, if a CountTrigger is specified, TumblingEventTimeWindows no longer gets Windows by time progress, but by count only. Now, if you want to react based on time and quantity, you have to write your own custom triggers.
      • Event-time window allocator each has an EventTimeTrigger as the default trigger. The trigger starts when Watermark passes the end of the window.

Flip-flop classification

CountTrigger

Fires once the number of data elements in the window exceeds a given limit. So the triggering mechanism is implemented in onElement

ProcessingTimeTrigger

Trigger based on processing time.

EventTimeTrigger

Triggers based on the event time schedule measured by Watermarks.

PurgingTrigger
  • Another trigger is taken as a parameter and converted to a clean trigger.

  • The function is to clear the data in the State of the window after the Trigger triggers the window calculation.

  • The first two data entered the window at 20:01 and 20:02, respectively. At this time, the value of State was updated to 3. At the same time, the Trigger time was reached, and the output result was 3.

  • Due to PurgingTrigger, the data in State is cleared.

DeltaTrigger
The application of DeltaTrigger
  • There is a demand for such a vehicle interval test. The vehicle reports its current position and speed every minute, and calculates the maximum speed within the interval every 10 kilometers.

Flip-flop prototype

  • onElement
  • onProcessingTime
  • onEventTime
  • onMerge
  • clear

instructions

  • TriggerResult can be one of the following
    • CONTINUE doing nothing
    • FIRE_AND_PURGE triggers the calculation and then clears the elements in the window
    • By default, the built-in trigger only returns FIRE and does not clear the window state.
    • PURGE PURGE the elements in the window
  • All event time window allocators have an EventTimeTrigger as the default trigger. Once watermark reaches the end of the window, this trigger is triggered.
  • The default trigger for a GlobalWindow is a NeverTrigger that will never fire. Therefore, when using a global window, you must customize a trigger.
  • Specifying a trigger using the trigger() method overrides the window allocator’s default trigger. For example, if you specify CountTrigger for TumblingEventTimeWindows, the window is no longer triggered based on time progress, only by counting. Until now, if you wanted to fire based on time and counting, you had to write your own custom triggers.

Classification of Windows

  • According to whether the window calls keyBy, it can be divided into keyed Windows and non-keyed Windows.

  • According to the driving mode of the Window, it can be divided into Time Window and Count Window.
  • Tumbling Windows/Sliding Windows/Session Windows/Global Windows

Windows are Keys

It can be understood that the data flow is classified according to a certain key in the original data flow. The data flow with the same key value will be a parallel logical flow entering the same window with multiple Windows

stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"[.trigger(...)]  <- optional:"trigger" (else defaulttrigger) [.evictor(...)]  <- optional:"evictor" (elseno evictor) [.allowedLateness(...)]  <- optional:"lateness" (elsezero) [.sideOutputLateData(...)]  <- optional:"output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"[.getSideOutput(...)]  <- optional:"output tag"
Copy the code

Non keyed Windows

  • No classification, each entry into a data that is added a window, multiple Windows parallel, each window processing 1 data

  • “WindowAll” is a function that does not support parallelism. The default level of parallelism is 1, so there are performance concerns when using this operator

    stream .windowAll(...) <- required: "assigner" [.trigger(...)]  <- optional: "trigger" (else default trigger) [.evictor(...)]  <- optional: "evictor" (else no evictor) [.allowedLateness(...)]  <- optional: "lateness" (else zero) [.sideOutputLateData(...)]  <- optional: "output tag" (else no side output for late data) .reduce/aggregate/fold/apply() <- required: "function" [.getSideOutput(...)]  <- optional: "output tag"Copy the code

The difference between

  • For keyized data flows, any attribute of the incoming event can be used as a Key (more details here).
  • Having keyized data flows allows your window calculations to be executed in parallel by multiple tasks, because each logically keyized data flow can be processed independently of the rest of the tasks. All data elements that reference the same Keys are sent to the same parallel task.

Time-Based window

Each record comes after be window assinger adopt different methods according to the time attribute value is assigned to one or more Windows, divided into rolling window (Tumbling Windows) and Sliding window (Sliding Windows).

  • EventTime The time carried by the data itself, the default time attribute;

  • ProcessingTime ProcessingTime;

  • IngestionTime The time when data enters the Flink program;

Tumbling Windows

There is no overlap between Windows under the scrolling window, and the window length is fixed. We can use TumblingEventTimeWindows and TumblingProcessingTimeWindows create a based on the Event Time or Processing Time rolling Time window.

The following example to rolling time window (TumblingEventTimeWindows) as an example, the default mode is TimeCharacteristic ProcessingTime processing time

/** The time characteristic that is used if none other is set. */
private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;
Copy the code

So if you use the Event Time namely data for actual Time, need through senv. SetStreamTimeCharacteristic specified

// Specify the actual time to use the datasenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<T> input = ... ;// tumbling event-time windows
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);

// tumbling processing-time windows
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);

// Subtract 8 hours from this to indicate UTC world time
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);
Copy the code

Sliding Windows

A sliding window slides forward continuously in a single step. The length of the window is fixed. When used, we set Slide and Size. The size of a Slide determines how often Flink creates new Windows. A smaller Slide can have a large number of Windows. When Slide is smaller than the Size of the window, adjacent Windows overlap and an event is assigned to multiple Windows. Slide is larger than Size and some events may be dropped.

Similarly, if it is a sliding time window, it is similar:

// The size of the window is 10s, and the slide is calculated every 5s
.timeWindow(Time.seconds(10), Time.seconds(5))
Copy the code

TimeWindow is used here, window is usually used, so what’s the difference?

TimeWindow (ProcessingTime, SlidingEventTimeWindows); timeWindow (ProcessingTime, SlidingEventTimeWindows); If you’re using the.window method, you’ll have to decide for yourself, but the former is a little bit easier to write.

public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
	if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
		return window(SlidingProcessingTimeWindows.of(size, slide));
	} else {
		returnwindow(SlidingEventTimeWindows.of(size, slide)); }}Copy the code

Count-Based window

Count Window is a group of data streams based on the number of elements. It is also divided into tumb and slide.

Tumbling Window when we want to Count every 100 total users purchase behavior event statistics, then fill 100 elements in each time Window, would be to calculate of the Window, the Window is what we call the rolling counting Window (Tumbling Count Windows). The window size shown in the figure above is 3. Using the DataStream API, we can implement this:

// Stream of (userId, buyCnts)
val buyCnts: DataStream[(Int, Int)] = ...

val tumblingCnts: DataStream[(Int, Int)] = buyCnts
  // key stream by sensorId
  .keyBy(0)
  // tumbling count window of 100 elements size
  .countWindow(100)
  // compute the buyCnt sum 
  .sum(1)
Copy the code

Sliding Window is also supported. It is not described in the figure above, but it has a similar meaning to Sliding Time Window. For example, calculate the sum of the last 100 elements for every 10 elements. An example of this code is shown below.

val slidingCnts: DataStream[(Int, Int)] = vehicleCnts
  .keyBy(0)
  // sliding count window of 100 elements size and 10 elements trigger interval
  .countWindow(100.10)
  .sum(1)
Copy the code

Session window

  • The Gap in SessionWindow is a very important concept. It refers to the interval between sessions.
  • If the interval between sessions is greater than the specified interval, the data will be divided into different sessions. For example, set the interval of 5 seconds, with 0-5 in one session and 5-10 in another

DataStream<T> input = ... ;// event-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);
    
// event-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withDynamicGap((element) -> {
        // determine and return session gap
    }))
    .<windowed transformation>(<window function>);

// processing-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);
    
// processing-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
        // determine and return session gap
    }))
    .<windowed transformation>(<window function>);
Copy the code

Global Windows

conclusion

SlidingEventTimeWindows,

SlidingProcessingTimeWindows,

TumblingEventTimeWindows,

TumblingProcessingTimeWindows

  • Time-based sliding window

    • SlidingEventTimeWindows
    • SlidingProcessingTimeWindows
  • Time-based rollover window

    • TumblingEventTimeWindows
    • TumblingProcessingTimeWindows
  • Counting based sliding window

    • countWindow(100, 10)
  • Counting based rollover window

    • countWindow(100)
  • Session window Session window: Records one window at a time

    • ProcessingTimeSessionWindows
    • EventTimeSessionWindows
  • GlobalWindows (GlobalWindows)

    • GlobalWindow is a GlobalWindow that is implemented as a singleton. Its maxTimestamp is set to long.max_value.
    • Inside the class is a static class that defines the GlobalWindow’s Serializer.

delay

By default, the delayed data element is removed when the watermark passes the end of the window. However, Flink allows you to specify the maximum allowable delay for a window operator. Allow delay Specifies how much time a data element can be delayed before being deleted, and its default value is 0. Data elements that arrive after the watermark has passed through the window but before the allowed delay has passed through the window are still added to the window. Depending on the trigger used, delayed but not discarded data elements may cause the window to fire again. That’s the case EventTimeTrigger.

When the allowed delay is greater than 0, the window and its contents are retained after the watermark passes through the window. In these cases, it may trigger another firing of the window when a late but undropped data element arrives. These shots are called late firings because they are triggered by late events, as opposed to main firing, which is the first shot of the window. In the case of session Windows, late ignition can further lead to window merging because they can “bridge” the gap between two pre-existing unmerged Windows. The data element emitted by the late trigger should be treated as an update of the previous calculation, that is, your data flow will contain multiple results of the same calculation. Depending on your application, you need to consider these duplicate results or deduplicate them.

Use of Windows

  • Flink creates a copy of each data element for each window. For this reason, a rollover window keeps a copy of each data element (a data element happens to belong to a window unless it is deferred) and a move window creates several copies of each data element, as described in the “window allocator” section. Therefore, a 1-day, 1-second sliding window might not be a good idea.
  • ReduceFunction, AggregateFunction, and FoldFunction can significantly reduce storage requirements because they eagerly aggregate data elements and store only one value per window. Instead, using ProcessWindowFunction alone requires the accumulation of all data elements.

Evictor

  • It removes elements after the trigger is fired and before the window is processed (apply windowFunction)
  • Flink’s window model allows an optional expeller (Evictor) to be specified in addition to the window allocator and trigger. You can use Evictor (…) Method to accomplish. The expeller can remove elements from a window after the trigger is fired and before or after the window function is applied
  • By default, all built-in ejectors are used before window functions
  • Specifying an expeller avoids pre-aggregation because all elements in the window must be passed to the expeller before applying the calculation.
  • Flink does not guarantee the order of elements within a window. This means that while the expeller can remove elements from the beginning of the window, they do not necessarily come first or last.

The built-in Evitor

  • TimeEvitor
    • For a given window, find the maximum timestamp max_ts in the element and remove all elements with a timestamp less than max_TS-interval, taking the time interval in milliseconds as an argument.
    • It’s essentially picking out the elements of the crime
  • CountEvitor
    • Keep the number of elements in the window to the specified number. If there are more than the specified number, discard the remaining elements from the beginning of the window buffer.
  • DeltaEvitor
    • Using DeltaFunction and a threshold, compute the delta between the last element in the window buffer and each remaining element, and remove elements whose delta is greater than or equal to the threshold.
    • Through the defined DeltaFunction and Threshold, calculate the Delta value of the element in the window and the latest element, and delete the element whose Delta value exceeds Threshold

watermark

  • Watermark is a mechanism for measuring the progress of Event Time, and is a hidden attribute of the data itself.
  • Watermark Apache Flink came up with a mechanism for handling EventTime window calculations that was essentially a timestamp, A system Event generated by Apache Flink Source or user-defined Watermark generator in accordance with requirements Punctuated or Periodic flows to the corresponding downstream operator like common data flow events. The operator that receives a Watermark Event constantly adjusts the EventTime clock it manages. When the operator receives a Watermark, the framework knows that no more data elements will arrive that are smaller than the timestamp of the Watermark, so Watermark can be seen as a way of telling The Apache Flink framework where the data stream has been processed (in time dimensions).
  • Generally, data based on Event Time contains a timestamp. Watermark is used to process out-of-order events, and the correct processing of out-of-order events is usually realized by combining the mechanism of watermark with window.
  • WaterMark trigger time mechanism (waterMark >= WINDOW_end_time)
    • When triggered for the first time, the window is triggered by all subsequent data that arrives in the window (late data)
    • The definition allows delay, so waterMark

      =window_end_time+allowedLateness, the window is closed and data is discarded
      +allowedlateness>
    • For out-of-order data, Flink can process out-of-order data in a certain range through the watermark mechanism combined with the operation of window. The data (new data) is later than the previous data, but the window of the data is not triggered. At this point the data is still valid — EventTime
    • Too much latency for out-of-order data
    • Note that if you do not specify the maximum Time allowed to delay, and if a lot of data is late, the window will be triggered as soon as the Event Time < watermark Time, meaning that the window will be triggered for every data that is late

Produce way

  • Punctuated
    • Each increment of EventTime in the data stream produces a Watermark (which is based on some calculation criteria).
    • In actual production, Punctuated method will generate a large number of Watermark in the scene with high TPS, which will cause pressure on downstream operators to some extent. Therefore, only in the scene with very high real-time requirement, the Punctuated method will be selected for Watermark generation.
    • Each event carries an event, which can generate a watermark based on that time, or another sign that can be carried based on the event — the end of the business sign
  • Periodic – Generates a Watermark periodically (at a certain time interval or at a certain number of records).
In the actual production, the Watermark must be generated periodically based on the two dimensions of time and accumulated bars. Otherwise, there would be a large delay in extreme cases.Copy the code

background

  • There is a process and time for stream processing from event generation to flow through source to operator. In most cases, though, the data streamed to the operator is in the order in which the events were generated
  • However, out-of-order (or late element) can not be excluded due to network, back pressure and other reasons.
  • With late Element, we can’t wait indefinitely. There must be a mechanism to ensure that after a certain time, the Window must be triggered to calculate
  • It indicates that after reaching watermark, all data prior to watermark has been reached (even if there is delayed data behind it)

Problem solved

  • The timestamp of the Watermark can be consistent with EventTime in the Event, or you can define any reasonable logic so that the timestamp of the Watermark is not equal to EventTime in the Event, and the EventTime in the Event cannot be changed from the moment it is created. Not controlled by the Apache Flink framework, The creation of Watermark is calculated in the Source node of Apache Flink or the implementation of Watermark generator (as shown in the embedded Watermark implementation of Apache Flink). Apache Flink internally has uniform Watermark handling for single-stream or multi-stream scenarios.
  • Will events that are smaller than the watermark timestamp by default be discarded

Multi-streaming waterMark

  • In actual flow calculation, data from multiple sources will be processed in one job, and data from sources will be grouped by GroupBy. Then, the same key values from different sources will be shuffled to the same processing node with their respective Watermark. Apache Flink internally needs to ensure that the Watermark increases monotonously, so that multiple sources of Watermark may not increase monotonously when aggregated together
  • Apache Flink internally implements only one incremental Watermark on each side. When multiple streams of Eventtime are brought together (GroupBy or Union), Apache Flink selects the smallest of all incoming EventTimes to flow downstream. This ensures the monotonically increasing of watermark and data integrity

understand

  • By default, the watermark window will not be counted if new data is added to the watermark
Watermark >=window_n_end_time && window_n_start_time<=vent_time<window_n_end_timeCopy the code
  • Watermark >= WINDOW_n_end_time && WATERMARK

Window aggregation

  • Incremental polymerization
    • A piece of data in the window is computed once
  • The full amount of polymerization
    • Evaluate all elements in the entire window at once (sorting can be done, batch at a time for external links)
    • use
      • Apply is called after the window, and the method argument inside the created element is an iterator

Some common methods

  • window
  • TimeWindow and countWind
  • The process and the apply

AssignerWithPunctuatedWatermarks AssignerWithPeriodicWatermarks or interface. In short, the former interface will send Watermark periodically, while the second interface will send Watermark based on some attribute of the arrival data, such as when a particular element is encountered in the stream.

Custom window

  • Window Assigner: Assigns elements to different Windows.
  • Trigger is a Trigger that defines when or under what circumstances to Fire a window.
    • For CountWindow, we can just use the defined Trigger: CountTrigger Trigger (CountTrigger. Of (2))
  • Evictor (optional) Evictor, which preserves elements left over from the previous window.
  • In the simplest case, if the business is not particularly complex and just based on Time and Count, we can actually implement different combinations using the system-defined WindowAssigner and Trigger and Evictor:

Data skew appears on window

  • Window-generated data skew refers to the fact that the amount of data stacked in different Windows differs too much. Essentially, this happens because the data source sends different amounts of data at different speeds. This situation is generally solved in two ways:
  • Do pre-aggregation before data enters the window;
  • Redesign window aggregation keys;

Follow the public account: Java big data and data warehouse, reply “information”, get big data data, learn big data technology.