Window Join (Window Join)

Window Join joins together elements of two streams that share the same key and reside in the same window. You can define these Windows using a window allocator and evaluate them against the elements in both flows.

Both elements are then passed to a user-defined JoinFunction or FlatJoinFunction, where the user can issue a result that meets the join criteria.

Common usage can be summarized as follows:

stream.join(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>)
Copy the code

Some semantic notes:

  • Pairs of elements that create two streams behave like oneinner-join, which means that if an element in a stream does not correspond to an element to be concatenated in another stream, that element will not be emitted.
  • Those elements that do join will be timestamped with the maximum timestamp (still in the corresponding window). For example, a window bounded by [5,10] will result in a connected element having 9 as its timestamp.

In the following sections, we’ll outline the behavior of different types of Window Joins when using some exemplary scenarios.

Tumbling Window Join

When Tumbling Window Join is implemented, all elements that have a public key and a public Tumbling Window Join are joined in pairs and passed to JoinFunction or FlatJoinFunction. * / * This element does not have an element from another stream in the tumbling window because it behaves like an inner join.

* / * this Window has the form [0,1], [2,3]… . The figure shows the pairings of all elements in each window that will be passed to JoinFunction. Notice that in the flip window [6,7], nothing is emitted because there is no element in the green flow to join with the orange elements ⑥ and joined.

import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; . val orangeStream: DataStream[Integer] = ... val greenStream: DataStream[Integer] = ... orangeStream.join(greenStream) .where(elem => /* select key */) .equalTo(elem => /* select key */) .window(TumblingEventTimeWindows.of(Time.milliseconds(2))) .apply { (e1, e2) => e1 + "," + e2 }Copy the code

Sliding Window Join

When performing a Sliding Window Join, all elements with a public Sliding Window and a public key are joined in pairs and passed to JoinFunction or FlatJoinFunction. In the current Sliding Window, elements in a stream that have no other stream elements are not emitted!

Note that some elements may be connected in one sliding window, but not simultaneously in another sliding window!

In this example, we use a sliding window of size 2 milliseconds to slide for 1 millisecond, resulting in sliding Windows [-1, 0], [0,1], [1,2], [2,3],… . The join element below the x axis is the element passed to the JoinFunction of each sliding window. Here you can also see, for example, how orange ② and green ③ are combined in Windows [2,3], but not in Windows [1,2].

import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; . val orangeStream: DataStream[Integer] = ... val greenStream: DataStream[Integer] = ... orangeStream.join(greenStream) .where(elem => /* select key */) .equalTo(elem => /* select key */) .window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */)) .apply { (e1, e2) => e1 + "," + e2 }Copy the code

Session Window Join

performSession Window JoinIs the same as meeting the session condition when there is “combination”keyAll elements of will be joined together in pairs and passed toJoinFunctionorFlatJoinFunction. Performed againinner joinSo if there is aSession Window JoinIf you contain only elements from a stream, no output is issued!

Here, we define a Session Window Join where each Session is at least 1ms apart. There are three sessions, and in the first two sessions, join elements from both flows are passed to JoinFunction. In the third session, there are no elements in the green flow, so ⑧ and ⑨ are not connected!

import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; import org.apache.flink.streaming.api.windowing.time.Time; . val orangeStream: DataStream[Integer] = ... val greenStream: DataStream[Integer] = ... orangeStream.join(greenStream) .where(elem => /* select key */) .equalTo(elem => /* select key */) .window(EventTimeSessionWindows.withGap(Time.milliseconds(1))) .apply { (e1, e2) => e1 + "," + e2 }Copy the code

Interval Join

Interval Join uses A common key to Join the elements of two streams (now called A and B respectively), and the elements of stream B have timestamps relative to the time Interval of the element timestamps of stream A.

This can also be more formally expressed as B.timstamp ∈ [A.timstamp + lowerBound; A.timstamp + upperBound] or A.timstamp + lowerBound <= B.timstamp <= a.timestamp + upperBound

Where a and B are elements of A and B, they share a common key. Both the lower limit and the upper limit can be negative or positive as long as the lower limit is always less than or equal to the upper limit. Interval Join currently only performs inner joins. Pass a pair of elements to ProcessJoinFunction, the birds will be assigned two elements of the larger timestamp (by ProcessJoinFunction. Context access).

Note :Interval Join currently supports only event time.

In the example above, we joined together two streams “orange” and “green” with a lower limit of -2 ms and a upper limit of +1 ms. By default, these boundaries contain boundaries, but. LowerBoundExclusive () and. UpperBoundExclusive can be applied to change behavior.

Again using the more formal notation, this converts to orangeelem.ts + lowerBound <= greenelem.ts <= orangeelem.ts + upperBound as shown in the triangle.

import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction; import org.apache.flink.streaming.api.windowing.time.Time; . val orangeStream: DataStream[Integer] = ... val greenStream: DataStream[Integer] = ... orangeStream .keyBy(elem => /* select key */) .intervalJoin(greenStream.keyBy(elem => /* select key */)) .between(Time.milliseconds(-2), Time.milliseconds(1)) .process(new ProcessJoinFunction[Integer, Integer, String] { override def processElement(left: Integer, right: Integer, ctx: ProcessJoinFunction[Integer, Integer, String]#Context, out: Collector[String]): Unit = { out.collect(left + "," + right); }}); });Copy the code

Recommended reading:

Ci.apache.org/projects/fl…

Github.com/perkinls/fl…

Yq.aliyun.com/users/ohyfz…