sequence

This paper mainly studies The Allowed Lateness of Flink

WindowedStream

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/datastream/WindowedStream.java

@Public public class WindowedStream<T, K, W extends Window> { /** The keyed data stream that is windowed by this stream. */ private final KeyedStream<T, K> input;  /** The window assigner. */ private final WindowAssigner<? super T, W> windowAssigner; /** The trigger that is usedfor window evaluation/emission. */
	private Trigger<? super T, ? super W> trigger;

	/** The evictor that is used for evicting elements before window evaluation. */
	private Evictor<? super T, ? super W> evictor;

	/** The user-specified allowed lateness. */
	private long allowedLateness = 0L;

	/**
	 * Side output {@code OutputTag} for late data. If no tag is set late data will simply be
	 * dropped.
 	 */
	private OutputTag<T> lateDataOutputTag;

	@PublicEvolving
	public WindowedStream<T, K, W> allowedLateness(Time lateness) {
		final long millis = lateness.toMilliseconds();
		checkArgument(millis >= 0, "The allowed lateness cannot be negative.");

		this.allowedLateness = millis;
		return this;
	}

	@PublicEvolving
	public WindowedStream<T, K, W> sideOutputLateData(OutputTag<T> outputTag) {
		Preconditions.checkNotNull(outputTag, "Side output tag must not be null.");
		this.lateDataOutputTag = input.getExecutionEnvironment().clean(outputTag);
		returnthis; } / /... public <R> SingleOutputStreamOperator<R> reduce( ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W>function,
			TypeInformation<R> resultType) {

		if (reduceFunction instanceof RichFunction) {
			throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction.");
		}

		//clean the closures
		function = input.getExecutionEnvironment().clean(function);
		reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);

		final String opName = generateOperatorName(windowAssigner, trigger, evictor, reduceFunction, function);
		KeySelector<T, K> keySel = input.getKeySelector();

		OneInputStreamOperator<T, R> operator;

		if(evictor ! = null) { @SuppressWarnings({"unchecked"."rawtypes"})
			TypeSerializer<StreamRecord<T>> streamRecordSerializer =
				(TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));

			ListStateDescriptor<StreamRecord<T>> stateDesc =
				new ListStateDescriptor<>("window-contents", streamRecordSerializer);

			operator =
				new EvictingWindowOperator<>(windowAssigner,
					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
					keySel,
					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
					stateDesc,
					new InternalIterableWindowFunction<>(new ReduceApplyWindowFunction<>(reduceFunction, function)),
					trigger,
					evictor,
					allowedLateness,
					lateDataOutputTag);

		} else {
			ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
				reduceFunction,
				input.getType().createSerializer(getExecutionEnvironment().getConfig()));

			operator =
				new WindowOperator<>(windowAssigner,
					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
					keySel,
					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
					stateDesc,
					new InternalSingleValueWindowFunction<>(function),
					trigger,
					allowedLateness,
					lateDataOutputTag);
		}

		returninput.transform(opName, resultType, operator); } / /... }Copy the code
  • WindowedStream has two parameters associated with Allowed Lateness: allowedLateness, which specifies how long elements are Allowed to be late, and lateDataOutputTag, which configures the output of late elements
  • WindowedStream’s reduce, Aggregate, fold, and Process operations create different WindowOperators depending on whether or not EVictor is null.Evictor creates EvictingWindowOperator instead of NULL, and evictor creates WindowOperator for null)
  • EvictingWindowOperator inherits from WindowOperator, and its constructor has more Evictor arguments than WindowOperator. But their constructors all require Trigger, allowedLateness, lateDataOutputTag parameters

OutputTag

Flink – core – 1.7.0 – sources jar! /org/apache/flink/util/OutputTag.java

@PublicEvolving
public class OutputTag<T> implements Serializable {

	private static final long serialVersionUID = 2L;

	private final String id;

	private final TypeInformation<T> typeInfo;

	public OutputTag(String id) {
		Preconditions.checkNotNull(id, "OutputTag id cannot be null."); Preconditions.checkArgument(! id.isEmpty(),"OutputTag id must not be empty.");
		this.id = id;

		try {
			this.typeInfo = TypeExtractor.createTypeInfo(this, OutputTag.class, getClass(), 0);
		}
		catch (InvalidTypesException e) {
			throw new InvalidTypesException("Could not determine TypeInformation for the OutputTag type. " +
					"The most common reason is forgetting to make the OutputTag an anonymous inner class. " +
					"It is also not possible to use generic type variables with OutputTags, such as 'Tuple2<A, B>'.", e);
		}
	}

	public OutputTag(String id, TypeInformation<T> typeInfo) {
		Preconditions.checkNotNull(id, "OutputTag id cannot be null."); Preconditions.checkArgument(! id.isEmpty(),"OutputTag id must not be empty.");
		this.id = id;
		this.typeInfo = Preconditions.checkNotNull(typeInfo, "TypeInformation cannot be null.");
	}

	// ------------------------------------------------------------------------

	public String getId() {
		return id;
	}

	public TypeInformation<T> getTypeInfo() {
		return typeInfo;
	}

	// ------------------------------------------------------------------------

	@Override
	public boolean equals(Object obj) {
		return obj instanceof OutputTag
			&& ((OutputTag) obj).id.equals(this.id);
	}

	@Override
	public int hashCode() {
		return id.hashCode();
	}

	@Override
	public String toString() {
		return "OutputTag(" + getTypeInfo() + "," + id + ")"; }}Copy the code
  • OutputTag is a side output identifier with a name and type information. Flink allows ProcessFunction, CoProcessFunction, ProcessWindowFunction, ProcessAllWindowFunction to output side Output, The Context of these functions has an output(OutputTag

    OutputTag, X value) method that outputs elements to side Output

SingleOutputStreamOperator

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java

@Public
public class SingleOutputStreamOperator<T> extends DataStream<T> {

	protected boolean nonParallel = false; private Map<OutputTag<? >, TypeInformation> requestedSideOutputs = new HashMap<>(); private boolean wasSplitApplied =false; / /... public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) {if (wasSplitApplied) {
			throw new UnsupportedOperationException("getSideOutput() and split() may not be called on the same DataStream. " +
				"As a work-around, please add a no-op map function before the split() call."); } sideOutputTag = clean(requireNonNull(sideOutputTag)); // make a defensive copy sideOutputTag = new OutputTag<X>(sideOutputTag.getId(), sideOutputTag.getTypeInfo()); TypeInformation<? >type = requestedSideOutputs.get(sideOutputTag);
		if (type! = null && ! type.equals(sideOutputTag.getTypeInfo())) { throw new UnsupportedOperationException("A side output with a matching id was " +
					"already requested with a different type. This is not allowed, side output " +
					"ids need to be unique.");
		}

		requestedSideOutputs.put(sideOutputTag, sideOutputTag.getTypeInfo());

		SideOutputTransformation<X> sideOutputTransformation = new SideOutputTransformation<>(this.getTransformation(), sideOutputTag);
		returnnew DataStream<>(this.getExecutionEnvironment(), sideOutputTransformation); }}Copy the code
  • SingleOutputStreamOperator provides getSideOutput method, could be obtained according to the OutputTag function before the site of the output in the output

WindowOperator

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java

@Internal
public class WindowOperator<K, IN, ACC, OUT, W extends Window>
	extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
	implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {

	//......

	public void processElement(StreamRecord<IN> element) throws Exception {
		final Collection<W> elementWindows = windowAssigner.assignWindows(
			element.getValue(), element.getTimestamp(), windowAssignerContext);

		//if element is handled by none of assigned elementWindows
		boolean isSkippedElement = true;

		final K key = this.<K>getKeyedStateBackend().getCurrentKey();

		if (windowAssigner instanceof MergingWindowAssigner) {
			MergingWindowSet<W> mergingWindows = getMergingWindowSet();

			for (W window: elementWindows) {

				// adding the new window might result in a merge, in that case the actualWindow
				// is the merged window and we work with that. If we don't merge then // actualWindow == window W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction
      
       () { @Override public void merge(W mergeResult, Collection
       
         mergedWindows, W stateWindowResult, Collection
        
          mergedStateWindows) throws Exception { if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) { throw new UnsupportedOperationException("The end timestamp of an " + "event-time window cannot become earlier than the current watermark " + "by merging. Current watermark: " + internalTimerService.currentWatermark() + " window: " + mergeResult); } else if (! windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) { throw new UnsupportedOperationException("The end timestamp of a " + "processing-time window cannot become earlier than the current  processing time " + "by merging. Current processing time: " + internalTimerService.currentProcessingTime() + " window: " + mergeResult); } triggerContext.key = key; triggerContext.window = mergeResult; triggerContext.onMerge(mergedWindows); for (W m: mergedWindows) { triggerContext.window = m; triggerContext.clear(); deleteCleanupTimer(m); } // merge the merged state windows into the newly resulting state window windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows); }}); // drop if the window is already late if (isWindowLate(actualWindow)) { mergingWindows.retireWindow(actualWindow); continue; } isSkippedElement = false; W stateWindow = mergingWindows.getStateWindow(actualWindow); if (stateWindow == null) { throw new IllegalStateException("Window " + window + " is not in in-flight window set."); } windowState.setCurrentNamespace(stateWindow); windowState.add(element.getValue()); triggerContext.key = key; triggerContext.window = actualWindow; TriggerResult triggerResult = triggerContext.onElement(element); if (triggerResult.isFire()) { ACC contents = windowState.get(); if (contents == null) { continue; } emitWindowContents(actualWindow, contents); } if (triggerResult.isPurge()) { windowState.clear(); } registerCleanupTimer(actualWindow); } // need to make sure to update the merging state in state mergingWindows.persist(); } else { for (W window: elementWindows) { // drop if the window is already late if (isWindowLate(window)) { continue; } isSkippedElement = false; windowState.setCurrentNamespace(window); windowState.add(element.getValue()); triggerContext.key = key; triggerContext.window = window; TriggerResult triggerResult = triggerContext.onElement(element); if (triggerResult.isFire()) { ACC contents = windowState.get(); if (contents == null) { continue; } emitWindowContents(window, contents); } if (triggerResult.isPurge()) { windowState.clear(); } registerCleanupTimer(window); } } // side output input event if // element not handled by any window // late arriving tag has been set // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp if (isSkippedElement && isElementLate(element)) { if (lateDataOutputTag ! = null){ sideOutput(element); } else { this.numLateRecordsDropped.inc(); } } } protected boolean isElementLate(StreamRecord
         
           element){ return (windowAssigner.isEventTime()) && (element.getTimestamp() + allowedLateness <= internalTimerService.currentWatermark()); } private long cleanupTime(W window) { if (windowAssigner.isEventTime()) { long cleanupTime = window.maxTimestamp() + allowedLateness; return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE; } else { return window.maxTimestamp(); }} / /... }
         
        
       
      Copy the code
  • WindowOperator has an isElementLate method that determines whether an element is late based on allowedLateness. Its processElement method finally executes the following logic if isSkippedElement is true and isElementLate is true: LateDataOutputTag lateDataOutputTag lateDataOutputTag lateDataOutputTag lateDataOutputTag lateDataOutputTag lateDataOutputTag lateDataOutputTag Execute numLateRecordsDropped. Increasing numLateRecordsDropped inc () to count

summary

  • When using an event-Time window, Flink provides the allowedLateness method that configures the amount of lateness an element is allowed to exceed before it is discarded (Elements that arrive within the window end time + allowed late time will still be added to the window), the default value is 0. For Windows assigners such as GlobalWindows, element does not have late because its end timestamp is long. MAX_VALUE
  • OutputTag is a side output identifier with a name and type information. Flink allows ProcessFunction, CoProcessFunction, ProcessWindowFunction, ProcessAllWindowFunction to output side Output, The Context of these functions has an output(OutputTag

    OutputTag, X value) method that outputs elements to side Output
  • SingleOutputStreamOperator provides getSideOutput method, could be obtained according to the OutputTag function before the site of the output in the output; The processElement method of the WindowOperator will say at the end, if isSkippedElement is true and isElementLate is true, If lateDataOutputTag is not null, the element of late is printed to side Output

doc

  • Allowed Lateness
  • Side Outputs
  • Window Lifecycle