sequence

This article focuses on Flink’s Triggers

Trigger

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/windowing/triggers/Trigger.java

@PublicEvolving
public abstract class Trigger<T, W extends Window> implements Serializable {

	private static final long serialVersionUID = -4104633972991191369L;

	public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;

	public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;

	public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;

	public boolean canMerge() {
		return false;
	}

	public void onMerge(W window, OnMergeContext ctx) throws Exception {
		throw new UnsupportedOperationException("This trigger does not support merging."); } public abstract void clear(W window, TriggerContext ctx) throws Exception; // ------------------------------------------------------------------------ public interface TriggerContext { long getCurrentProcessingTime(); MetricGroup getMetricGroup(); long getCurrentWatermark(); void registerProcessingTimeTimer(long time); void registerEventTimeTimer(long time); void deleteProcessingTimeTimer(long time); void deleteEventTimeTimer(long time); <S extends State> S getPartitionedState(StateDescriptor<S, ? > stateDescriptor); @Deprecated <S extends Serializable> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState); @Deprecated <S extends Serializable> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState); } public interface OnMergeContext extends TriggerContext { <S extends MergingState<? ,? >> void mergePartitionedState(StateDescriptor<S, ? > stateDescriptor); }}Copy the code
  • Trigger accepts two generics, one of the Element type and one of the window type. It defines methods onElement, onProcessingTime, onEventTime, canMerge, onMerge, and clear. OnElement, onProcessingTime, and onEventTime all return TriggerResult
  • OnElement is called back when each element is added to the window; OnProcessingTime is called back when the registered Event-Time timer is triggered; OnEventTime is called back when the registered Processing-time timer is triggered
  • CanMerge is used to indicate whether the merge of trigger states is supported. By default, false is returned. OnMerge is triggered when multiple Windows merge. Clear clears the associated state stored in TriggerContext
  • Trigger also defines TriggerContext and OnMergeContext; TriggerContext defines the registration and deletion of the EventTimeTimer, ProcessingTimeTimer methods, GetCurrentProcessingTime, getMetricGroup, getCurrentWatermark, getPartitionedState, getKeyValueState, and getKeyValueState are also defined method
  • OnMergeContext inherits TriggerContext, which defines the mergePartitionedState method

TriggerResult

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java

public enum TriggerResult {

	CONTINUE(false.false),

	FIRE_AND_PURGE(true.true),

	FIRE(true.false),

	PURGE(false.true);

	// ------------------------------------------------------------------------

	private final boolean fire;
	private final boolean purge;

	TriggerResult(boolean fire, boolean purge) {
		this.purge = purge;
		this.fire = fire;
	}

	public boolean isFire() {
		return fire;
	}

	public boolean isPurge() {
		returnpurge; }}Copy the code
  • TriggerResult TriggerResult is the action enumeration that trigger returns when onElement, onProcessingTime, and onEventTime are called back. CONTINUE, FIRE_AND_PURGE, FIRE, and PURGE are five enumerations
  • Fire is whether to trigger Windows computation; Purge indicates whether to purge window data
  • CONTINUE does nothing for window; FIRE_AND_PURGE means that you need to trigger The Windows computation operation and then clean up the window data; FIRE just triggers Window computation but doesn’t clean up window data; PURGE means that it doesn’t trigger Window computation but it does clean up window data

EventTimeTrigger

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java

@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
	private static final long serialVersionUID = 1L;

	private EventTimeTrigger() {}

	@Override
	public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
		if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
			// if the watermark is already past the window fire immediately
			return TriggerResult.FIRE;
		} else {
			ctx.registerEventTimeTimer(window.maxTimestamp());
			return TriggerResult.CONTINUE;
		}
	}

	@Override
	public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
		return time == window.maxTimestamp() ?
			TriggerResult.FIRE :
			TriggerResult.CONTINUE;
	}

	@Override
	public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
		return TriggerResult.CONTINUE;
	}

	@Override
	public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
		ctx.deleteEventTimeTimer(window.maxTimestamp());
	}

	@Override
	public boolean canMerge() {
		return true;
	}

	@Override
	public void onMerge(TimeWindow window,
			OnMergeContext ctx) {
		// only register a timer if the watermark is not yet past the end of the merged window
		// this is in line with the logic inonElement(). If the watermark is past the end of // the window onElement() will fire and setting a timer here would fire  the window twice. long windowMaxTimestamp = window.maxTimestamp();if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
			ctx.registerEventTimeTimer(windowMaxTimestamp);
		}
	}

	@Override
	public String toString() {
		return "EventTimeTrigger()";
	}

	public static EventTimeTrigger create() {
		returnnew EventTimeTrigger(); }}Copy the code
  • EventTimeTrigger inherits Trigger, element type Object, window type TimeWindow; SlidingEventTimeWindows, TumblingEventTimeWindows, EventTimeSessionWindows, DynamicEventTimeSessionWindows EventTimeTri is used by default gger
  • OnElement returns triggerResult.fire if window.maxtimestamp () is less than or equal to ctx.getCurrentwaterMark (), Otherwise do CTX. RegisterEventTimeTimer (window. MaxTimestamp ()), and then return TriggerResult. CONTINUE; OnEventTime returns triggerResult.fire if time is equal to window.maxtimestamp (), triggerResult.continue otherwise; OnProcessingTime returns triggerResult.continue
  • CanMerge returns true; OnMerge in window. MaxTimestamp () is greater than the CTX. GetCurrentWatermark () will perform CTX. RegisterEventTimeTimer (windowMaxTimestamp); Clear: ctx.deleteEventTimeTimer(window.maxtimestamp ())

ProcessingTimeTrigger

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java

@PublicEvolving
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
	private static final long serialVersionUID = 1L;

	private ProcessingTimeTrigger() {}

	@Override
	public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
		ctx.registerProcessingTimeTimer(window.maxTimestamp());
		return TriggerResult.CONTINUE;
	}

	@Override
	public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
		return TriggerResult.CONTINUE;
	}

	@Override
	public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
		return TriggerResult.FIRE;
	}

	@Override
	public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
		ctx.deleteProcessingTimeTimer(window.maxTimestamp());
	}

	@Override
	public boolean canMerge() {
		return true;
	}

	@Override
	public void onMerge(TimeWindow window,
			OnMergeContext ctx) {
		// only register a timer if the time is not yet past the end of the merged window
		// this is in line with the logic in onElement(). If the time is past the end of
		// the window onElement() will fire and setting a timer here would fire the window twice.
		long windowMaxTimestamp = window.maxTimestamp();
		if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
			ctx.registerProcessingTimeTimer(windowMaxTimestamp);
		}
	}

	@Override
	public String toString() {
		return "ProcessingTimeTrigger()";
	}

	public static ProcessingTimeTrigger create() {
		returnnew ProcessingTimeTrigger(); }}Copy the code
  • ProcessingTimeTrigger inherits Trigger, element type Object, window type TimeWindow; SlidingProcessingTimeWindows, TumblingProcessingTimeWindows, ProcessingTimeSessionWindows, DynamicProcessingTimeSessionWind Ows uses ProcessingTimeTrigger by default
  • OnElement execution CTX. RegisterProcessingTimeTimer (window. MaxTimestamp ()), and then return TriggerResult. CONTINUE; OnEventTime returns triggerResult.continue; OnProcessingTime returns triggerResult.fire
  • CanMerge returns true; OnMerge in window. MaxTimestamp () is greater than the CTX. GetCurrentWatermark () will perform CTX. RegisterProcessingTimeTimer (windowMaxTimestamp); The clear is executed CTX. DeleteProcessingTimeTimer (window. MaxTimestamp ())

NeverTrigger

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java

	@Internal
	public static class NeverTrigger extends Trigger<Object, GlobalWindow> {
		private static final long serialVersionUID = 1L;

		@Override
		public TriggerResult onElement(Object element, long timestamp, GlobalWindow window, TriggerContext ctx) {
			return TriggerResult.CONTINUE;
		}

		@Override
		public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) {
			return TriggerResult.CONTINUE;
		}

		@Override
		public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) {
			return TriggerResult.CONTINUE;
		}

		@Override
		public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}

		@Override
		public void onMerge(GlobalWindow window, OnMergeContext ctx) {
		}
	}
Copy the code
  • NeverTrigger’s onElement, onEventTime, and onProcessingTime all return triggerResult.continue; GlobalWindows uses NeverTrigger by default

CountTrigger

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java

@PublicEvolving
public class CountTrigger<W extends Window> extends Trigger<Object, W> {
	private static final long serialVersionUID = 1L;

	private final long maxCount;

	private final ReducingStateDescriptor<Long> stateDesc =
			new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);

	private CountTrigger(long maxCount) {
		this.maxCount = maxCount;
	}

	@Override
	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
		ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
		count.add(1L);
		if (count.get() >= maxCount) {
			count.clear();
			return TriggerResult.FIRE;
		}
		return TriggerResult.CONTINUE;
	}

	@Override
	public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
		return TriggerResult.CONTINUE;
	}

	@Override
	public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
		return TriggerResult.CONTINUE;
	}

	@Override
	public void clear(W window, TriggerContext ctx) throws Exception {
		ctx.getPartitionedState(stateDesc).clear();
	}

	@Override
	public boolean canMerge() {
		return true;
	}

	@Override
	public void onMerge(W window, OnMergeContext ctx) throws Exception {
		ctx.mergePartitionedState(stateDesc);
	}

	@Override
	public String toString() {
		return "CountTrigger(" +  maxCount + ")";
	}

	public static <W extends Window> CountTrigger<W> of(long maxCount) {
		return new CountTrigger<>(maxCount);
	}

	private static class Sum implements ReduceFunction<Long> {
		private static final long serialVersionUID = 1L;

		@Override
		public Long reduce(Long value1, Long value2) throws Exception {
			returnvalue1 + value2; }}}Copy the code
  • CountTrigger inherits from Trigger, specifying element type as Object; It defines maxCount and ReducingStateDescriptor; Where ReducingStateDescriptor is used to count Windows (It uses its own Sum functionIn the onElement method, if the count is greater than or equal to maxCount, the count is cleared and triggerResult. FIRE is returned, otherwise triggerResult. CONTINUE is returned; OnEventTime and onProcessingTime both return triggerResult.continue; CanMerge returns true; OnMerge execution is CTX mergePartitionedState (stateDesc); Ctx.getpartitionedstate (stateDesc).clear()

PurgingTrigger

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java

@PublicEvolving
public class PurgingTrigger<T, W extends Window> extends Trigger<T, W> {
	private static final long serialVersionUID = 1L;

	private Trigger<T, W> nestedTrigger;

	private  PurgingTrigger(Trigger<T, W> nestedTrigger) {
		this.nestedTrigger = nestedTrigger;
	}

	@Override
	public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
		TriggerResult triggerResult = nestedTrigger.onElement(element, timestamp, window, ctx);
		return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
	}

	@Override
	public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
		TriggerResult triggerResult = nestedTrigger.onEventTime(time, window, ctx);
		return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
	}

	@Override
	public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
		TriggerResult triggerResult = nestedTrigger.onProcessingTime(time, window, ctx);
		return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
	}

	@Override
	public void clear(W window, TriggerContext ctx) throws Exception {
		nestedTrigger.clear(window, ctx);
	}

	@Override
	public boolean canMerge() {
		return nestedTrigger.canMerge();
	}

	@Override
	public void onMerge(W window, OnMergeContext ctx) throws Exception {
		nestedTrigger.onMerge(window, ctx);
	}

	@Override
	public String toString() {
		return "PurgingTrigger(" + nestedTrigger.toString() + ")";
	}

	public static <T, W extends Window> PurgingTrigger<T, W> of(Trigger<T, W> nestedTrigger) {
		return new PurgingTrigger<>(nestedTrigger);
	}

	@VisibleForTesting
	public Trigger<T, W> getNestedTrigger() {
		returnnestedTrigger; }}Copy the code
  • PurgingTrigger is a wrapped Trigger that wraps nestedTrigger, whose onElement, onEventTime, and onProcessingTime are based on the result returned by nestedTrigger. When triggerresult.isfire () is true, the wrapper returns triggerresult.fire_and_purge; Methods such as canMerge, onMerge and clear are all delegated to nestedTrigger

summary

  • Trigger accepts two generics, one of the Element type and one of the window type. It defines methods onElement, onProcessingTime, onEventTime, canMerge, onMerge, and clear. OnElement, onProcessingTime, and onEventTime all return TriggerResult. TriggerResult TriggerResult is the action enumeration that trigger returns when onElement, onProcessingTime, and onEventTime are called back. It has fire and Purge properties.Fire is whether to trigger Windows computation; Purge indicates whether to purge window data), CONTINUE, FIRE_AND_PURGE, FIRE, and PURGE
  • SlidingEventTimeWindows, TumblingEventTimeWindows, EventTimeSessionWindows, DynamicEventTimeSessionWindows EventTimeTri is used by default Gger; SlidingProcessingTimeWindows, TumblingProcessingTimeWindows, ProcessingTimeSessionWindows, DynamicProcessingTimeSessionWind Ows uses ProcessingTimeTrigger by default; GlobalWindows uses NeverTrigger by default
  • CountTrigger is the main window type used for counting. It uses ReducingStateDescriptor to count Windows. In the onElement method, when the count is greater than or equal to maxCount, the count is emptied and triggerResult. FIRE is returned. Otherwise, return triggerResult.continue; PurgingTrigger is a wrapped Trigger that wraps nestedTrigger, whose onElement, onEventTime, and onProcessingTime are based on the result returned by nestedTrigger. When triggerresult.isfire () is true, the wrapper returns triggerresult.fire_and_purge; Methods such as canMerge, onMerge and clear are all delegated to nestedTrigger

doc

  • Triggers