sequence

This paper mainly studies the Window operation of Flink

window

DataStream

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/datastream/DataStream.java

	public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size) {
		if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
			return windowAll(TumblingProcessingTimeWindows.of(size));
		} else {
			return windowAll(TumblingEventTimeWindows.of(size));
		}
	}

	public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size, Time slide) {
		if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
			return windowAll(SlidingProcessingTimeWindows.of(size, slide));
		} else {
			return windowAll(SlidingEventTimeWindows.of(size, slide));
		}
	}

	public AllWindowedStream<T, GlobalWindow> countWindowAll(long size) {
		return windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
	}

	public AllWindowedStream<T, GlobalWindow> countWindowAll(long size, long slide) {
		return windowAll(GlobalWindows.create())
				.evictor(CountEvictor.of(size))
				.trigger(CountTrigger.of(slide));
	}

	@PublicEvolving
	public <W extends Window> AllWindowedStream<T, W> windowAll(WindowAssigner<? super T, W> assigner) {
		return new AllWindowedStream<>(this, assigner);
	}
Copy the code
  • For non-keyedstream, there are timeWindowAll, countWindowAll, windowAll operations. The main windowAll operation is the windowAll operation, which has parallelism of 1 and requires a WindowAssigner parameter. What’s returned is the AllWindowedStream

KeyedStream

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/datastream/KeyedStream.java

	public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
		if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
			return window(TumblingProcessingTimeWindows.of(size));
		} else {
			return window(TumblingEventTimeWindows.of(size));
		}
	}

	public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
		if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
			return window(SlidingProcessingTimeWindows.of(size, slide));
		} else {
			return window(SlidingEventTimeWindows.of(size, slide));
		}
	}

	public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
		return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
	}

	public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
		return window(GlobalWindows.create())
				.evictor(CountEvictor.of(size))
				.trigger(CountTrigger.of(slide));
	}

	@PublicEvolving
	public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
		return new WindowedStream<>(this, assigner);
	}
Copy the code
  • In addition to inheriting DataStream’s windowing operations, KeyedStream mainly uses timeWindow, countWindow, and window. The most important operation is the window operation, which also requires a WindowAssigner parameter. What came back was the WindowedStream

WindowedStream

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/datastream/WindowedStream.java

@Public public class WindowedStream<T, K, W extends Window> { /** The keyed data stream that is windowed by this stream. */ private final KeyedStream<T, K> input;  /** The window assigner. */ private final WindowAssigner<? super T, W> windowAssigner; /** The trigger that is usedfor window evaluation/emission. */
	private Trigger<? super T, ? super W> trigger;

	/** The evictor that is used for evicting elements before window evaluation. */
	private Evictor<? super T, ? super W> evictor;

	/** The user-specified allowed lateness. */
	private long allowedLateness = 0L;

	/**
	 * Side output {@code OutputTag} for late data. If no tag is set late data will simply be
	 * dropped.
 	 */
	private OutputTag<T> lateDataOutputTag;

	@PublicEvolving
	public WindowedStream(KeyedStream<T, K> input,
			WindowAssigner<? super T, W> windowAssigner) {
		this.input = input;
		this.windowAssigner = windowAssigner;
		this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
	}

	@PublicEvolving
	public WindowedStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger) {
		if(windowAssigner instanceof MergingWindowAssigner && ! trigger.canMerge()) { throw new UnsupportedOperationException("A merging window assigner cannot be used with a trigger that does not support merging.");
		}

		if (windowAssigner instanceof BaseAlignedWindowAssigner) {
			throw new UnsupportedOperationException("Cannot use a " + windowAssigner.getClass().getSimpleName() + " with a custom trigger.");
		}

		this.trigger = trigger;
		return this;
	}

	@PublicEvolving
	public WindowedStream<T, K, W> allowedLateness(Time lateness) {
		final long millis = lateness.toMilliseconds();
		checkArgument(millis >= 0, "The allowed lateness cannot be negative.");

		this.allowedLateness = millis;
		return this;
	}

	@PublicEvolving
	public WindowedStream<T, K, W> sideOutputLateData(OutputTag<T> outputTag) {
		Preconditions.checkNotNull(outputTag, "Side output tag must not be null.");
		this.lateDataOutputTag = input.getExecutionEnvironment().clean(outputTag);
		return this;
	}

	@PublicEvolving
	public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
		if (windowAssigner instanceof BaseAlignedWindowAssigner) {
			throw new UnsupportedOperationException("Cannot use a " + windowAssigner.getClass().getSimpleName() + " with an Evictor.");
		}
		this.evictor = evictor;
		returnthis; } // ------------------------------------------------------------------------ // Operations on the keyed windows // -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- / /... }Copy the code
  • The WindowedStream has several parameters. The constructor requires input and windowAssigner, and then Trigger, Evictor, allowedLateness, and OutputTag. In addition, operation Function must be set, mainly including ReduceFunction, AggregateFunction and FoldFunction(abandoned), ProcessWindowFunction
  • WindowAssigner is used to determine how elements are divided into Windows, Here are mainly TumblingEventTimeWindows/TumblingProcessingTimeWindows, SlidingEventTimeWindows/SlidingProcessingTimeWindows, EventTi MeSessionWindows/ProcessingTimeSessionWindows, GlobalWindows this a few
  • Trigger is used to Trigger the launch of Windows, Evictor is used to strip elements when launching Windows, allowedLateness is used to specify the maximum amount of time that elements are allowed to lag behind watermark, beyond which they are discarded (This parameter is valid only for Event-Time Windows), OutputTag for the newest data output to the side output, can be achieved by SingleOutputStreamOperator. GetSideOutput (OutputTag) method to get

The properties/actions of AllWindowedStream are basically similar to WindowedStream, so we won’t go into detail here

summary

  • The Window operation is at the heart of dealing with an infinite data stream by dividing the data stream into buckets of a finite size, on which operations can be performed. The Window operation of Flink is mainly divided into two categories, one is windowAll operation for KeyedStream and the other is windowAll operation for non-key stream
  • The window operation takes several arguments, including WindowAssigner. Main TumblingEventTimeWindows/TumblingProcessingTimeWindows, SlidingEventTimeWindows/SlidingProcessingTimeWindows, EventTime SessionWindows/ProcessingTimeSessionWindows, GlobalWindows this a few; In addition, operation Function must be set, mainly including ReduceFunction, AggregateFunction and FoldFunction(abandoned), ProcessWindowFunction
  • Trigger, Evictor, allowedLateness and OutputTag are optional parameters. Trigger is used to Trigger the launch of window, and Evictor is used to remove elements when launching window. AllowedLateness is used to specify the maximum amount of time an element is allowed to lag behind watermark before it is discarded (This parameter is valid only for Event-Time Windows), OutputTag for the newest data output to the side output, can be achieved by SingleOutputStreamOperator. GetSideOutput (OutputTag) method to get

doc

  • Windows