A common problem with batch processing is to Join two data sources associatively. For example, many mobile phone apps have a User data source, and the APP will record the User’s Behavior, which is called Behavior. The two tables are joined according to userId. In stream processing scenarios, Flink also supports joins, except that Flink joins two tables in a time window.

Currently, Flink supports two types of joins: Window Join and Interval Join.

Window Join

As you can guess from the name, Window Join operates primarily on Flink’s Window, joining elements that fall into the same Window in two streams according to a Key. The general skeleton structure of a Window Join is:

input1.join(input2)
    .where(<KeySelector>) < -input1 which field to use asKey
    .equalTo(<KeySelector>) < -input2 which field to use asKey
    .window(<WindowAssigner>) < - specifiedWindowAssigner
    [.trigger(<Trigger>)] < - specifiedTriggerOptional [.evictor(<Evictor>)] < - specifiedEvictor(Optional).apply(<JoinFunction>) < - specifiedJoinFunction
Copy the code

The following figure shows the general process of Join. The two input streams are first grouped by Key, and then the elements are divided into Windows. The partitioning of Windows needs to be defined using WindowAssigner, which can use the default WindowAssigner provided by Flink such as scroll Windows, sliding Windows, or session Windows. Elements from the two data streams are then allocated to each window, meaning that one window contains elements from both data streams. Data in the same window will be associated with each other in the semantics of INNER JOIN to form a data pair. When the window time ends, Flink calls JoinFunction to process the data pairs in the window. Of course, we can also do some custom optimizations using Trigger or Evictor, which are used in the same way as normal Windows.

Let’s focus on the analysis of how the two data streams INNER JOIN:

In general, INNER JOIN only joins elements that appear in both data sources to form a data pair, that is, one element in data source Input1 is paired one by one with all elements in data source input2. When there is no data in a window of the data source, such as the third window in the figure, the Join result is also empty.

class MyJoinFunction extends JoinFunction[(String.Int), (String.Int), String] {

  override def join(input1: (String.Int), input2: (String.Int)) :String = {
    "input 1 :" + input1._2 + ", input 2 :" + input2._2
  }

}

val input1: DataStream[(String.Int)] =...val input2: DataStream[(String.Int)] =...val joinResult = input1.join(input2)
      .where(i1 => i1._1)
      .equalTo(i2 => i2._1)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
      .apply(new MyJoinFunction)
Copy the code

The above code customizes the JoinFunction and prints out the Join result. The principle is the same whether the scrolling window demonstrated in the code is a sliding window or a session window. In addition to JoinFunction, Flink also provides FlatJoinFunction, whose function is to output zero to multiple results.

If INNER JOIN doesn’t meet our needs, CoGroupFunction provides more customizable functionality. Note that input1.cogroup (input2).where(

).equalto (

).

class MyCoGroupFunction extends CoGroupFunction[(String.Int), (String.Int), String] {

  / / here is of type Java Iterable, need to refer to the collection. JavaConverters. _ and into Scala
  override def coGroup(input1: lang.可迭代[(String.Int)], input2: lang.可迭代[(String.Int)], out: Collector[String) :Unit = {
    input1.asScala.foreach(element => out.collect("input1 :" + element.toString()))
    input2.asScala.foreach(element => out.collect("input2 :" + element.toString()))
  }

}

val input1: DataStream[(String.Int)] =...val input2: DataStream[(String.Int)] =...val coGroupResult = input1.coGroup(input2)
      .where(i1 => i1._1)
      .equalTo(i2 => i2._1)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
      .apply(new MyCoGroupFunction)
Copy the code

Interval Join

Unlike Window Join, Interval Join does not rely on Flink’s WindowAssigner, but defines the time according to an Interval. Interval requires lower bound and upper bound. If we add input1 and input2 to Interval Join, one of the elements in input1 is input1.element1. Ts + lower bound, input1.element1.ts + upper bound], Elements in input2 that fall within this time range will form a data pair with input1. Element1. Expressed mathematically as, Input1.element1. ts + lower bound <= input2.elementx.ts <=input1.element1.ts + upper bound Pair up. The upper and lower bounds can be positive or negative.

Note that Flink (1.9) Interval Join currently supports only the Event Time semantics.

The following code shows how to do Interval Join on two data streams:

class MyProcessFunction extends ProcessJoinFunction[(String.Long.Int), (String.Long.Int), String] {
  override def processElement(input1: (String.Long.Int),
                              input2: (String.Long.Int),
                              context: ProcessJoinFunction[(String.Long.Int), (String.Long.Int), String] #Context,
                              out: Collector[String) :Unit = {

    out.collect("input 1: " + input1.toString() + ", input 2: " + input2.toString)

  }
}

// the data stream has three fields :(key, timestamp, value)
val input1: DataStream[(String.Long.Int)] =...val input2: DataStream[(String.Long.Int)] =...val intervalJoinResult = input1.keyBy(_._1)
      .intervalJoin(input2.keyBy(_._1))
      .between(Time.milliseconds(- 5), Time.milliseconds(10))
      .process(new MyProcessFunction)
Copy the code

The default interval is to include upper and lower bounds, so you can use.lowerboundexclusive () and.upperboundexclusive to determine if upper and lower bounds are required.

val intervalJoinResult = input1.keyBy(_._1)
      .intervalJoin(input2.keyBy(_._1))
      .between(Time.milliseconds(- 5), Time.milliseconds(10))
      .upperBoundExclusive()
      .lowerBoundExclusive()
      .process(new MyProcessFunction)
Copy the code

Interval Join uses the cache to store all data, so it is important to ensure that the cache data is not too large to avoid excessive memory stress.