sequence

This paper mainly studies the intervalJoin operation of Flink KeyedStream

The instance

DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...

orangeStream
    .keyBy(<KeySelector>)
    .intervalJoin(greenStream.keyBy(<KeySelector>))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process (new ProcessJoinFunction<Integer, Integer, String(){

        @Override
        public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
            out.collect(first + ","+ second); }});Copy the code

KeyedStream.intervalJoin

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/datastream/KeyedStream.java

@Public
public class KeyedStream<T, KEY> extends DataStream<T> {
	//......

	@PublicEvolving
	public <T1> IntervalJoin<T, T1, KEY> intervalJoin(KeyedStream<T1, KEY> otherStream) {
		returnnew IntervalJoin<>(this, otherStream); } / /... }Copy the code
  • The intervalJoin of KeyedStream creates and returns the intervalJoin

IntervalJoin

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/datastream/KeyedStream.java

	@PublicEvolving
	public static class IntervalJoin<T1, T2, KEY> {

		private final KeyedStream<T1, KEY> streamOne;
		private final KeyedStream<T2, KEY> streamTwo;

		IntervalJoin(
				KeyedStream<T1, KEY> streamOne,
				KeyedStream<T2, KEY> streamTwo
		) {
			this.streamOne = checkNotNull(streamOne);
			this.streamTwo = checkNotNull(streamTwo);
		}

		@PublicEvolving
		public IntervalJoined<T1, T2, KEY> between(Time lowerBound, Time upperBound) {

			TimeCharacteristic timeCharacteristic =
				streamOne.getExecutionEnvironment().getStreamTimeCharacteristic();

			if(timeCharacteristic ! = TimeCharacteristic.EventTime) { throw new UnsupportedTimeCharacteristicException("Time-bounded stream joins are only supported in event time");
			}

			checkNotNull(lowerBound, "A lower bound needs to be provided for a time-bounded join");
			checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded join");

			return new IntervalJoined<>(
				streamOne,
				streamTwo,
				lowerBound.toMilliseconds(),
				upperBound.toMilliseconds(),
				true.true); }}Copy the code
  • IntervalJoin provides a between operation to set lowerBound and upperBound for interval. Here you can see between method of non TimeCharacteristic inside. The direct selling UnsupportedTimeCharacteristicException EventTime; The BETWEEN operation creates and returns the IntervalJoined

IntervalJoined

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/datastream/KeyedStream.java

	@PublicEvolving
	public static class IntervalJoined<IN1, IN2, KEY> {

		private final KeyedStream<IN1, KEY> left;
		private final KeyedStream<IN2, KEY> right;

		private final long lowerBound;
		private final long upperBound;

		private final KeySelector<IN1, KEY> keySelector1;
		private final KeySelector<IN2, KEY> keySelector2;

		private boolean lowerBoundInclusive;
		private boolean upperBoundInclusive;

		public IntervalJoined(
				KeyedStream<IN1, KEY> left,
				KeyedStream<IN2, KEY> right,
				long lowerBound,
				long upperBound,
				boolean lowerBoundInclusive,
				boolean upperBoundInclusive) {

			this.left = checkNotNull(left);
			this.right = checkNotNull(right);

			this.lowerBound = lowerBound;
			this.upperBound = upperBound;

			this.lowerBoundInclusive = lowerBoundInclusive;
			this.upperBoundInclusive = upperBoundInclusive;

			this.keySelector1 = left.getKeySelector();
			this.keySelector2 = right.getKeySelector();
		}

		@PublicEvolving
		public IntervalJoined<IN1, IN2, KEY> upperBoundExclusive() {
			this.upperBoundInclusive = false;
			return this;
		}

		@PublicEvolving
		public IntervalJoined<IN1, IN2, KEY> lowerBoundExclusive() {
			this.lowerBoundInclusive = false;
			return this;
		}

		@PublicEvolving
		public <OUT> SingleOutputStreamOperator<OUT> process(ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction) {
			Preconditions.checkNotNull(processJoinFunction);

			final TypeInformation<OUT> outputType = TypeExtractor.getBinaryOperatorReturnType(
				processJoinFunction,
				ProcessJoinFunction.class,
				0,
				1,
				2,
				TypeExtractor.NO_INDEX,
				left.getType(),
				right.getType(),
				Utils.getCallLocationName(),
				true
			);

			return process(processJoinFunction, outputType);
		}

		@PublicEvolving
		public <OUT> SingleOutputStreamOperator<OUT> process(
				ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction,
				TypeInformation<OUT> outputType) {
			Preconditions.checkNotNull(processJoinFunction);
			Preconditions.checkNotNull(outputType);

			final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction);

			final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
				new IntervalJoinOperator<>(
					lowerBound,
					upperBound,
					lowerBoundInclusive,
					upperBoundInclusive,
					left.getType().createSerializer(left.getExecutionConfig()),
					right.getType().createSerializer(right.getExecutionConfig()),
					cleanedUdf
				);

			return left
				.connect(right)
				.keyBy(keySelector1, keySelector2)
				.transform("Interval Join", outputType, operator); }}Copy the code
  • IntervalJoined is exclusive by default for lowerBound and upperBound. It also provides lowerBoundExclusive and upperBoundExclusive to be exclusive. IntervalJoined provides the process operation, which receives ProcessJoinFunction; The process operation creates the IntervalJoinOperator, Left.connect (right).keyby (keySelector1, keySelector2).transform(“Interval Join”, outputType, operator) Returns the SingleOutputStreamOperator (In this example, orangeStream is left and greenStream is right)

ProcessJoinFunction

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/functions/co/ProcessJoinFunction.java

@PublicEvolving public abstract class ProcessJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction { private static final long serialVersionUID = -2444626938039012398L; public abstract void processElement(IN1 left, IN2 right, Context ctx, Collector<OUT> out) throws Exception; public abstract class Context { public abstract long getLeftTimestamp(); public abstract long getRightTimestamp(); public abstract long getTimestamp(); public abstract <X> void output(OutputTag<X> outputTag, X value); }}Copy the code
  • ProcessJoinFunction inherits the AbstractRichFunction, which defines the processElement abstract method as well as its own Context object. This object defines getLeftTimestamp, getRightTimestamp, getTimestamp, output four abstract methods

IntervalJoinOperator

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java

@Internal
public class IntervalJoinOperator<K, T1, T2, OUT>
		extends AbstractUdfStreamOperator<OUT, ProcessJoinFunction<T1, T2, OUT>>
		implements TwoInputStreamOperator<T1, T2, OUT>, Triggerable<K, String> {

	private static final long serialVersionUID = -5380774605111543454L;

	private static final Logger logger = LoggerFactory.getLogger(IntervalJoinOperator.class);

	private static final String LEFT_BUFFER = "LEFT_BUFFER";
	private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
	private static final String CLEANUP_TIMER_NAME = "CLEANUP_TIMER";
	private static final String CLEANUP_NAMESPACE_LEFT = "CLEANUP_LEFT";
	private static final String CLEANUP_NAMESPACE_RIGHT = "CLEANUP_RIGHT";

	private final long lowerBound;
	private final long upperBound;

	private final TypeSerializer<T1> leftTypeSerializer;
	private final TypeSerializer<T2> rightTypeSerializer;

	private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
	private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;

	private transient TimestampedCollector<OUT> collector;
	private transient ContextImpl context;

	private transient InternalTimerService<String> internalTimerService;

	public IntervalJoinOperator(
			long lowerBound,
			long upperBound,
			boolean lowerBoundInclusive,
			boolean upperBoundInclusive,
			TypeSerializer<T1> leftTypeSerializer,
			TypeSerializer<T2> rightTypeSerializer,
			ProcessJoinFunction<T1, T2, OUT> udf) {

		super(Preconditions.checkNotNull(udf));

		Preconditions.checkArgument(lowerBound <= upperBound,
			"lowerBound <= upperBound must be fulfilled");

		// Move buffer by +1 / -1 depending on inclusiveness in order not needing
		// to check for inclusiveness later on
		this.lowerBound = (lowerBoundInclusive) ? lowerBound : lowerBound + 1L;
		this.upperBound = (upperBoundInclusive) ? upperBound : upperBound - 1L;

		this.leftTypeSerializer = Preconditions.checkNotNull(leftTypeSerializer);
		this.rightTypeSerializer = Preconditions.checkNotNull(rightTypeSerializer);
	}

	@Override
	public void open() throws Exception {
		super.open();

		collector = new TimestampedCollector<>(output);
		context = new ContextImpl(userFunction);
		internalTimerService =
			getInternalTimerService(CLEANUP_TIMER_NAME, StringSerializer.INSTANCE, this);
	}

	@Override
	public void initializeState(StateInitializationContext context) throws Exception {
		super.initializeState(context);

		this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
			LEFT_BUFFER,
			LongSerializer.INSTANCE,
			new ListSeriawelizer<>(new BufferEntrySerializer<>(leftTypeSerializer))
		));

		this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
			RIGHT_BUFFER,
			LongSerializer.INSTANCE,
			new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer))
		));
	}

	@Override
	public void processElement1(StreamRecord<T1> record) throws Exception {
		processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
	}

	@Override
	public void processElement2(StreamRecord<T2> record) throws Exception {
		processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
	}

	@SuppressWarnings("unchecked")
	private <THIS, OTHER> void processElement(
			final StreamRecord<THIS> record,
			final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
			final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
			final long relativeLowerBound,
			final long relativeUpperBound,
			final boolean isLeft) throws Exception {

		final THIS ourValue = record.getValue();
		final long ourTimestamp = record.getTimestamp();

		if (ourTimestamp == Long.MIN_VALUE) {
			throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
					"interval stream joins need to have timestamps meaningful timestamps.");
		}

		if (isLate(ourTimestamp)) {
			return;
		}

		addToBuffer(ourBuffer, ourValue, ourTimestamp);

		for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
			final long timestamp  = bucket.getKey();

			if (timestamp < ourTimestamp + relativeLowerBound ||
					timestamp > ourTimestamp + relativeUpperBound) {
				continue;
			}

			for (BufferEntry<OTHER> entry: bucket.getValue()) {
				if (isLeft) {
					collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
				} else {
					collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
				}
			}
		}

		long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
		if (isLeft) {
			internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
		} else {
			internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
		}
	}

	private boolean isLate(long timestamp) {
		long currentWatermark = internalTimerService.currentWatermark();
		returncurrentWatermark ! = Long.MIN_VALUE && timestamp < currentWatermark; } private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception { final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp); collector.setAbsoluteTimestamp(resultTimestamp); context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp); userFunction.processElement(left, right, context, collector); } @Override public void onEventTime(InternalTimer<K, String> timer) throws Exception { long timerTimestamp = timer.getTimestamp(); String namespace = timer.getNamespace(); logger.trace("onEventTime @ {}", timerTimestamp);

		switch (namespace) {
			case CLEANUP_NAMESPACE_LEFT: {
				long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
				logger.trace("Removing from left buffer @ {}", timestamp);
				leftBuffer.remove(timestamp);
				break;
			}
			case CLEANUP_NAMESPACE_RIGHT: {
				long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
				logger.trace("Removing from right buffer @ {}", timestamp);
				rightBuffer.remove(timestamp);
				break;
			}
			default:
				throw new RuntimeException("Invalid namespace " + namespace);
		}
	}

	@Override
	public void onProcessingTime(InternalTimer<K, String> timer) throws Exception {
		// do nothing.
	}

	//......
}
Copy the code
  • IntervalJoinOperator inherited AbstractUdfStreamOperator abstract class, has realized the TwoInputStreamOperator and Triggerable interface
  • Covers IntervalJoinOperator AbstractUdfStreamOperator (StreamOperator definitionOpen, initializeState, which creates InternalTimerService in the open method, passing the Triggerable parameter this, that is, the implementation of the Triggerable interface; The leftBuffer and rightBuffer mapStates are created in the initializeState method
  • IntervalJoinOperator implements the processElement1, processElement2 methods defined by the TwoInputStreamOperator interface.TwoInputStreamOperator interface definitions of some other methods in the superclass AbstractStreamOperator AbstractUdfStreamOperator implementation); ProcessElement1 and processElement2 both call the processElement method internally, It’s just that you pass different relativeLowerBound, relativeUpperBound, isLeft parameters, and leftBuffer and rightBuffer in different order
  • The processElement method implements the time matching logic of the intervalJoin. It gets the currentWatermark from internalTimerService and determines whether the Element is late. If late returns, Otherwise proceed; We then add element’s value to ourBuffer (OurBuffer is leftBuffer for processElement1 and rightBuffer for processElement2); We then iterate over each element in otherBuffer to determine if the time is sufficient (OurTimestamp + relativeLowerBound <= timestamp <= ourTimestamp + relativeUpperBound), skip directly if the requirements are not met, and call the collect method if the requirements are met.Collect method is executed in userFunction processElement, is called user defined ProcessJoinFunction processElement method); Followed by calculating cleanupTime, call internalTimerService registerEventTimeTimer registered clearing the timer of the element
  • IntervalJoinOperator implements the Triggerable interface’s onEventTime and onProcessingTime methods, where onProcessingTime does nothing. OnEventTime cleans the element in leftBuffer or rightBuffer based on timestamp

summary

  • Is KeyedStream flink intervalJoin operation requirement, and must is TimeCharacteristic EventTime; The intervalJoin of KeyedStream creates and returns the intervalJoin; IntervalJoin provides between to set lowerBound and upperBound for the interval. This creates and returns IntervalJoined
  • IntervalJoined provides the process operation, which receives ProcessJoinFunction; The process operation creates the IntervalJoinOperator, Left.connect (right).keyby (keySelector1, keySelector2).transform(“Interval Join”, outputType, operator) Returns the SingleOutputStreamOperator
  • IntervalJoinOperator inherited AbstractUdfStreamOperator abstract class, has realized the TwoInputStreamOperator and Triggerable interface; It covers AbstractUdfStreamOperator (StreamOperator definitionOpen, initializeState, which creates InternalTimerService in the open method, passing the Triggerable parameter this, that is, the implementation of the Triggerable interface; LeftBuffer and rightBuffer mapStates are created in the initializeState method. It implements the processElement1, processElement2 methods defined by the TwoInputStreamOperator interface, ProcessElement1 and processElement2 both call the processElement method internally, It’s just that you pass different relativeLowerBound, relativeUpperBound, isLeft parameters, and leftBuffer and rightBuffer in different order
  • IntervalJoinOperator’s processElement method implements the time-matching logic of the intervalJoin. It checks whether the element is late, and if late returns, then adds the element to the buffer. Each element in the otherBuffer is iterated to determine whether the time meets the requirement (OurTimestamp + relativeLowerBound <= timestamp <= ourTimestamp + relativeUpperBound), skip directly if the requirements are not met, and call the collect method if the requirements are met.Collect method is executed in userFunction processElement, is called user defined ProcessJoinFunction processElement method); Followed by calculating cleanupTime, call internalTimerService registerEventTimeTimer registered clearing the timer of the element
  • IntervalJoinOperator implements the Triggerable interface’s onEventTime and onProcessingTime methods, where onProcessingTime does nothing. OnEventTime cleans the element in leftBuffer or rightBuffer based on timestamp

doc

  • Interval Join