Process Function

The conversion operator we learned before cannot access the timestamp and watermark information of the event. For example, map conversion operators such as MapFunction cannot access the timestamp or the event time of the current event. And this is extremely important in some application scenarios. Based on this, the DataStream API provides a series of low-level converters that can access timestamps, watermark, and register timed events. You can also output specific events, such as timeout events.

Process Functions are used to build event-driven applications and implement custom business logic. Flink provides eight Process functions:

  • ProcessFunction
  • KeyedProcessFunction
  • CoProcessFunction
  • ProcessJoinFunction
  • BroadcastProcessFunction
  • KeyedBroadcastProcessFunction
  • ProcessWindowFunction
  • ProcessAllWindowFunction

1. ProcessFunction

ProcessFunction is a low-level stream processing operation that has access to the basic building blocks of the stream handler: Event, State, and Timers.

  1. Each call to processElement gets a Context object that has access to the element’s Event Time, Timestamp, and TimerService.
  2. Access the Keyed state through RuntimeContext.
  3. TimerService can be used to instantaneously register callbacks for future Event/Process times. When a specific time for the timer is reached, the onTimer method is called. During this call, all states are again restricted to the keys used to create the timer, allowing the timer to manipulate the keyed state.

PS: The learning mode of this paper is to post introductory learning cases after each introduction of the use of Process Function.

package processfunction

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala._

/** * @Author Natasha * @Description * @Date 2020/11/4 16:40 **/
object ProcessFunctionDemo {
  val WORDS = "To be, or not to be,--that is the question:--"
  def main(args: Array[String) :Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    env.fromElements(WORDS)
      .flatMap(new RichFlatMapFunction1)
      .process(new ProcessFunction1)
      .print()
    env.execute()
  }

  class ProcessFunction1 extends ProcessFunction[(String.Integer), (String.Integer)] {
    override def processElement(value: (String.Integer),
                                ctx: ProcessFunction[(String.Integer), (String.Integer)] #Context,
                                out: Collector[(String.Integer)]) :Unit = {
      out.collect(value._1, value._2 + 1)}}class RichFlatMapFunction1 extends RichFlatMapFunction[String, (String.Integer)] {
    override def flatMap(value: String, collector: Collector[(String.Integer)]) :Unit = {
      val spliters = value.toLowerCase.split("\\W+") // \\W+ matches 0 to more characters
      for (v <- spliters) {
        if (v.length > 0) {
          collector.collect((v, 1)}}}}}Copy the code

2. KeyedProcessFunction

KeyedProcessFunction, as an extension to ProcessFunction, provides access to the corresponding timer key in its onTimer method. KeyedProcessFunction [KEY, IN, OUT] provides two additional methods for handling a KeyedStream:

  • processElement(in: IN, ctx: Context, out: Collector[OUT])Each element in the flow will call this method, and the result will be output in the Collector data type. Context can access the element’s timestamp, the element’s key, and the TimerService time service. Context can also output results to other streams (side outputs).
  • onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT])Is a callback function that is called when a previously registered timer fires. Timestamp is the trigger timestamp set by the timer, Collector is the result set, OnTimerContext and provides some information about the context, such as the time when the timer was triggered (event time or processing time).
package processfunction

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

/** * @Author Natasha * @Description * @Date 2020/11/4 15:45 **/
object KeyedProcessFunctionDemo {
  val WORDS = "To be, or not to be,--that is the question:--"
  def main(args: Array[String) :Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.fromElements(WORDS)
      .flatMap(new RichFlatMapFunction1)
      .keyBy(_._1)
      .process(new KeyedProcessFunction1)
      .print()
    env.execute()
  }

  class KeyedProcessFunction1 extends KeyedProcessFunction[String, (String.Integer), (String.Integer)] {
    override def processElement(value: (String.Integer),
                                ctx: KeyedProcessFunction[String, (String.Integer), (String.Integer)] #Context,
                                out: Collector[(String.Integer)]) :Unit = {
      // Ctx.getCurrentKey is used for KeyedStream, keyBy, and ctx.getCurrentKey
      out.collect(ctx.getCurrentKey + ")" + value._1, value._2 + 1)}}class RichFlatMapFunction1 extends RichFlatMapFunction[String, (String.Integer)] {
    override def flatMap(value: String, collector: Collector[(String.Integer)]) :Unit = {
      val spliters = value.toLowerCase.split("\\W+")
      for (v <- spliters) {
        if (v.length > 0) {
          collector.collect((v, 1)}}}}}Copy the code

3. ProcessWindowFunction

ProcessWindowFunction caches all the data IN the window. When used, Flink caches all elements of a window under a Key IN Iterable

, which we need to process using Collector

to collect the output. We can use Context to get more information about the window, including time, status, location of late data, and so on.

The following code is a simple application of ProcessWindowFunctionDemo we do to price the number of occurrences of the statistics, the output of the selected appear most times out.

package windowfunction

import model.StockPrice
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

/** * @Author Natasha * @Description * @Date 2020/11/18 15:57 **/
object ProcessWindowFunctionDemo {
  def main(args: Array[String) :Unit = {
    val aenv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    aenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    aenv.setParallelism(1)

    val resource = getClass.getResource("/AggregateFunctionLog.csv").getPath
    val socketStream = aenv.readTextFile(resource)
    val input = socketStream
      .map(data => {
        val arr = data.split(",")
        StockPrice(arr(0), arr(1).toDouble, arr(2).toLong)
      })
      .assignAscendingTimestamps(_.timestamp * 1000L)
    val frequency = input
      .keyBy(s => s.symbol)
      .timeWindow(Time.seconds(10))
      .process(new ProcessWindowFunction1)
      .print()

    aenv.execute()
  }
  
class ProcessWindowFunction1 extends ProcessWindowFunction[StockPrice, (String.Double), String.TimeWindow] {
  override def process(key: String,
                       context: Context,
                       elements: 可迭代[StockPrice],
                       out: Collector[(String.Double)]) :Unit = {

    // The stock price and the number of times that price appears
    var countMap = scala.collection.mutable.Map[Double.Int] ()for(element <- elements) {
      val count = countMap.getOrElse(element.price, 0)
      countMap(element.price) = count + 1
    }
    // Sort by the number of occurrences
    val sortedMap = countMap.toSeq.sortWith(_._2 > _._2)
    // Select the highest occurrence output to Collector
    if (sortedMap.size > 0) {
      out.collect((key, sortedMap(0)._1))}}}}Copy the code

Join the two datastreams

4. JoinFunction

Window-based Join requires the windowing mechanism of Flink, whose principle is to allocate elements from two input streams to a public window and Join (or coGroup) as the window completes.

Join (input2). Where (<KeySelector>) < -input1 which field is used as the Key. EqualTo (<KeySelector>) < -input2 which field is used as the Key .window(<WindowAssigner>) <- specifies WindowAssigner [.trigger(< trigger >)] <- specifies trigger(optional) [.evictor(< evictor >)] <- Specify Evictor (optional). Apply (<JoinFunction>) <- Specify JoinFunctionCopy the code

The following figure shows the general process of Join: the two input streams, input1 and inpuT2, are grouped by Key respectively, and the elements are divided into Windows using WindowAssigner, which can use the default WindowAssigner provided by Flink, such as scroll, slide, or session Windows. Elements from the two data streams are then allocated to each window, meaning that one window contains elements from both data streams.

Data in the same window will be associated with each other in the semantics of INNER JOIN to form a data pair, that is, an element in the data source Input1 is paired with all elements in the data source Input2 one by one. When the window time ends, Flink calls JoinFunction to process the data pairs in the window.

package processfunction

import org.apache.flink.api.common.functions.JoinFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._

/** * @Author Natasha * @Description * @Date 2020/11/19 14:20 **/
object JoinFunctionDemo {
  def main(args: Array[String) :Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val orangeStream = env
      .fromElements(
        (1.1999L),
        (1.2001L))
      .assignAscendingTimestamps(_._2)
    val greenStream = env
      .fromElements(
        (1.1001L),
        (1.1002L),
        (1.3999L))
      .assignAscendingTimestamps(_._2)
    orangeStream.join(greenStream)
      .where(r => r._1)
      .equalTo(r => r._1)
      .window(TumblingEventTimeWindows.of(Time.seconds(2)))
// .apply { (e1, e2) => e1 + " *** " + e2
      .apply(new MyJoinFunction)
      .print()

    env.execute()
  }

  class MyJoinFunction extends JoinFunction[(Int.Long), (Int.Long), String] {
    override def join(input1: (Int.Long), input2: (Int.Long)) :String = {
      input1 + " *** " + input2
    }
  }
}
Copy the code

5. CoGroupFunction

If INNER JOIN doesn’t meet our needs, CoGroupFunction provides more customizable functionality. Note that input1.cogroup (input2).where(

).equalto (

).

Input1.cogroup (input2). Where (<KeySelector>) < -input1 which field is used as the Key. EqualTo (<KeySelector>) < -input2 which field is used as the Key .window(<WindowAssigner>) <- specifies WindowAssigner [.trigger(< trigger >)] <- specifies trigger(optional) [.evictor(< evictor >)] <- Specify Evictor (optional).apply(<CoGroupFunction>) < -cogroupfunctionCopy the code
package processfunction

import java.text.SimpleDateFormat

import model.{StockSnapshot.StockTransaction}
import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
/** * @Author Natasha * @Description * @Date 2020/11/19 15:01 * @data * Transaction: * 2016-07-28 13:00:01.820,000001 10.2 * 2016-07-28 13:00:01.260,000001 10.2 * 2016-07-28 13:00:02.980,000001 10.1 * 2016-07-28 13:00:03.120,000001 10.1 * 2016-07-28 13:00:04.330,000001 10.0 * 2016-07-28 13:00:05.570,000001 10.0 * 2016-07-28 13:00:05.570,000001 10.0 * 2016-07-28 13:00:03.120,000001 10.1 * 2016-07-28 13:00:04.330,000001 10.0 * 2016-07-28 13:00:05.570,000001 10.0 * 2016-07-28 13:00:05.990,000001 10.0 * 2016-07-28 13:00:14.000,000001 10.1 * 2016-07-28 13:00:20.000,000001 10.2 * Snapshot: * 2016-07-28 13:00:01.000,000001 10.2 * 2016-07-28 13:00:04.000,000001 10.1 * 2016-07-28 13:00:07.000,000001 10.0 * 2016-07-28 13:00:07.000,000001 10.0 * The 2016-07-28 13:00:16. 000000 001 ration * * /
object CoGroupFunctionDemo {
  val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
  def main(args : Array[String) :Unit= {val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val dataStream1 = env.socketTextStream("localhost".8998)
    val dataStream2 = env.socketTextStream("localhost".8999)
    * TX: 2016-07-28 13:00:01.000,000002 10.2 * MD: Here, because it is a test, the solid water level is in ascending order (that is, the Event Time of the data itself is entered in ascending order) */
    val dataStreamMap1 = dataStream1
      .map(f => {
        val tokens1 = f.split(",")
        StockTransaction(tokens1(0), tokens1(1), tokens1(2).toDouble)
      })
      .assignAscendingTimestamps(f => format.parse(f.tx_time).getTime)
    val dataStreamMap2 = dataStream2
      .map(f => {
        val tokens2 = f.split(",")
        StockSnapshot(tokens2(0), tokens2(1), tokens2(2).toDouble)
      })
      .assignAscendingTimestamps(f => format.parse(f.md_time).getTime)
    val joinedStream = dataStreamMap1
      .coGroup(dataStreamMap2)
      .where(_.tx_code)
      .equalTo(_.md_code)
      .window(TumblingEventTimeWindows.of(Time.seconds(3)))

    val innerJoinedStream = joinedStream.apply(new InnerJoinFunction)
    val leftJoinedStream = joinedStream.apply(new LeftJoinFunction)
    val rightJoinedStream = joinedStream.apply(new RightJoinFunction)

    innerJoinedStream.name("InnerJoinedStream").print()
    leftJoinedStream.name("LeftJoinedStream").print()
    rightJoinedStream.name("RightJoinedStream").print()

    env.execute("3 Type of Double Stream Join")}class InnerJoinFunction extends CoGroupFunction[StockTransaction.StockSnapshot, (String.String.String.Double.Double.String)]{
    override def coGroup(T1: java.lang.可迭代[StockTransaction].T2: java.lang.可迭代[StockSnapshot], out: Collector[(String.String.String.Double.Double.String)]) :Unit = {
      /** * Convert Java Iterable objects to Scala Iterable * Scala's collection operations are efficient and concise */
      import scala.collection.JavaConverters. _val scalaT1 = T1.asScala.toList
      val scalaT2 = T2.asScala.toList
      /** * Inner Join to compare the same key, the same time window data */
      if(scalaT1.nonEmpty && scalaT2.nonEmpty){
        for(transaction <- scalaT1){
          for(snapshot <- scalaT2){
            out.collect(transaction.tx_code,transaction.tx_time, snapshot.md_time,transaction.tx_value,snapshot.md_value,"Inner Join Test")}}}}}class LeftJoinFunction extends CoGroupFunction[StockTransaction.StockSnapshot, (String.String.String.Double.Double.String)] {
    override def coGroup(T1: java.lang.可迭代[StockTransaction].T2: java.lang.可迭代[StockSnapshot], out: Collector[(String.String.String.Double.Double.String)]) :Unit = {
      /** * Convert Java Iterable objects to Scala Iterable * Scala's collection operations are efficient and concise */
      import scala.collection.JavaConverters. _val scalaT1 = T1.asScala.toList
      val scalaT2 = T2.asScala.toList
      /** * Left Join compares data in the same window */
      if(scalaT1.nonEmpty && scalaT2.isEmpty){
        for(transaction <- scalaT1){
          out.collect(transaction.tx_code,transaction.tx_time, "",transaction.tx_value,0."Left Join Test")}}}}class RightJoinFunction extends CoGroupFunction[StockTransaction.StockSnapshot, (String.String.String.Double.Double.String)] {
    override def coGroup(T1: java.lang.可迭代[StockTransaction].T2: java.lang.可迭代[StockSnapshot], out: Collector[(String.String.String.Double.Double.String)]) :Unit = {
      /** * Convert Java Iterable objects to Scala Iterable * Scala's collection operations are efficient and concise */
      import scala.collection.JavaConverters. _val scalaT1 = T1.asScala.toList
      val scalaT2 = T2.asScala.toList
      /** * Right Join compares data in the same window */
      if(scalaT1.isEmpty && scalaT2.nonEmpty){
        for(snapshot <- scalaT2){
          out.collect(snapshot.md_code, "",snapshot.md_time,0,snapshot.md_value,"Right Join Test")}}}}}Copy the code

6. ProcessJoinFunction

Time-based dual-stream Join: Unlike Window Join, Interval Join does not rely on Flink’s WindowAssigner, but defines the time according to the Interval.

Input1. intervalJoin(input2). Where (<KeySelector>) < -input1 which field is used as the Key. EqualTo (<KeySelector>) < -input2 which field is used as the Key .window(<WindowAssigner>) <- specifies WindowAssigner [.trigger(< trigger >)] <- specifies trigger(optional) [.evictor(< evictor >)] <- Specify Evictor (optional). Process (<ProcessJoinFunction>) <- Specify ProcessJoinFunctionCopy the code

Interval requires lower bound and upper bound. If we add input1 and input2 to Interval Join, one of the elements in input1 is input1.element1. Ts + lower bound, input1.element1.ts + upper bound, Elements in input2 that fall within this time range will form a data pair with input1. Element1.

Mathematical formula expression:input1.element1.ts + lower bound <= input2.elementx.ts <=input1.element1.ts + upper boundThe upper and lower bounds can be positive or negative.

package processfunction

import model.{UserBrowseLog.UserClickLog}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
/** * @Author Natasha * @Description * @Date 2020/11/19 13:51 **/
object ProcessJoinFunctionDemo {
  def main(args: Array[String) :Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)
  
    val clickStream = env
      .fromElements(
        UserClickLog("user_2"."1500"."click"."page_1"), / / (900, 1500)
        UserClickLog("user_2"."2000"."click"."page_1")  / / (1400, 2000)
      )
      .assignAscendingTimestamps(_.eventTime.toLong * 1000L)
      .keyBy(_.userID)
    val browseStream = env
      .fromElements(
        UserBrowseLog("user_2"."1000"."browse"."product_1"."10"), / / (1000, 1600)
        UserBrowseLog("user_2"."1500"."browse"."product_1"."10"), / / (1500, 2100)
        UserBrowseLog("user_2"."1501"."browse"."product_1"."10"), / / (1501, 2101)
        UserBrowseLog("user_2"."1502"."browse"."product_1"."10")  / / (1502, 2102)
      )
      .assignAscendingTimestamps(_.eventTime.toLong * 1000L)
      .keyBy(_.userID)
    /** * implement double stream join */
    clickStream.intervalJoin(browseStream)
      .between(Time.minutes(- 10), Time.seconds(0)) // Define the upper and lower bounds as (-10,0)
      .process(new MyIntervalJoin)
      .print()
    env.execute()
  }

  class MyIntervalJoin extends ProcessJoinFunction[UserClickLog.UserBrowseLog.String] {
    override def processElement(left: UserClickLog,
                                right: UserBrowseLog,
                                ctx: ProcessJoinFunction[UserClickLog.UserBrowseLog.String] #Context,
                                out: Collector[String) :Unit = {
      out.collect(left + " ==> " + right)
    }
  }
}
Copy the code

conclusion

  1. Windows-based join uses the window mechanism. Data is cached in the Window State first. When the window triggers, join operation is performed.
  2. Interval Join uses state to store data for reprocessing. The difference lies in the invalidation mechanism of data in state, which triggers data clearing.

7. CoProcessFunction

Implement a dual-stream JOIN using connect and coProcessFunction: CoProcessFunction implements low-level operations on two inputs that bind to two different input streams, calling processElement1 and processElement2 to process the data from the two input streams, respectively.

Input1.connect (input2). Where (<KeySelector>) < -input1 which field is used as the Key. EqualTo (<KeySelector>) < -input2 which field is used as the Key .window(<WindowAssigner>) <- specifies WindowAssigner [.trigger(< trigger >)] <- specifies trigger(optional) [.evictor(< evictor >)] <- Specify Evictor (optional).process(<CoProcessFunction>) < -coprocessFunctionCopy the code

Implementing low-order Joins generally follows:

  1. Creates a state object for one (or two) inputs.
  2. Update the status when an element is received from the input source.
  3. After receiving the element from another input, the state is retrieved and the result of the join is generated.

package processfunction

import model.SensorReading
import org.apache.flink.api.scala._
import org.apache.flink.api.common.state.{ValueState.ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.scala.{DataStream.StreamExecutionEnvironment}
import org.apache.flink.util.Collector
import source.SensorSource

/** * @Author Natasha * @Description * @Date 2020/11/19 16:13 **/
object CoProcessFunctionDemo {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    env.setParallelism(1)
      
    val filterSwitches: DataStream[(String.Long)] = env
      .fromCollection(Seq(("sensor_2".10 * 1000L), // forward readings of sensor_2 for 10 seconds
        ("sensor_7".60 * 1000L)) // forward readings of sensor_7 for 1 minute)
      )
    val readings: DataStream[SensorReading] = env
      .addSource(new SensorSource)
    val forwardedReadings = readings
      .connect(filterSwitches)
      .keyBy(_.id, _._1)
      .process(new ReadingFilter)
      .print()

    env.execute("Monitor sensor temperatures.")}class ReadingFilter extends CoProcessFunction[SensorReading, (String.Long), SensorReading] {
  // switch to enable forwarding
  lazy val forwardingEnabled: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean] ("filterSwitch".Types.of[Boolean]))
  // hold timestamp of currently active disable timer
  lazy val disableTimer: ValueState[Long] =  getRuntimeContext.getState(new ValueStateDescriptor[Long] ("timer".Types.of[Long]))
  override def processElement1(
                                reading: SensorReading,
                                ctx: CoProcessFunction[SensorReading, (String.Long), SensorReading] #Context,
                                out: Collector[SensorReading) :Unit = {

    // check if we may forward the reading
    if (forwardingEnabled.value()) {
      out.collect(reading)
    }
  }
  override def processElement2(
                                switch: (String.Long),
                                ctx: CoProcessFunction[SensorReading, (String.Long), SensorReading] #Context,
                                out: Collector[SensorReading) :Unit = {
    // enable reading forwarding
    forwardingEnabled.update(true)
    // set disable forward timer
    val timerTimestamp = ctx.timerService().currentProcessingTime() + switch._2
    val curTimerTimestamp = disableTimer.value()
    if (timerTimestamp > curTimerTimestamp) {
      // remove current timer and register new timer
      ctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp)
      ctx.timerService().registerProcessingTimeTimer(timerTimestamp)
      disableTimer.update(timerTimestamp)
    }
  }
  override def onTimer(
                        ts: Long,
                        ctx: CoProcessFunction[SensorReading, (String.Long), SensorReading] #OnTimerContext,
                        out: Collector[SensorReading) :Unit = {

    // remove all state. Forward switch will be false by default.
    forwardingEnabled.clear()
    disableTimer.clear()
  }
 }
}
Copy the code

8. KeyedCoProcessFunction

Reconcile the order information with the payment information within five minutes, and issue a warning to those who fail:

package processfunction

import model.{OrderEvent.PayEvent}
import org.apache.flink.api.common.state.{ValueState.ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

/** * @Author Natasha * @Description * @Date 2020/11/19 15:09 **/
object KeyedCoProcessFunctionDemo {
  // Use it to print the order payment event that is not matched
  val unmatchedOrders = new OutputTag[String] ("unmatched-orders")
  // It is used to output third-party payment events that are not matched
  val unmatchedPays = new OutputTag[String] ("unmatched-pays")
  def main(args: Array[String) :Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
      
    val orders = env
      .fromElements(
        OrderEvent("order_1"."pay".2000L),
        OrderEvent("order_2"."pay".5000L),
        OrderEvent("order_3"."pay".6000L)
      )
      .assignAscendingTimestamps(_.eventTime)
      .keyBy(_.orderId)
    val pays = env
      .fromElements(
        PayEvent("order_1"."weixin".7000L),
        PayEvent("order_2"."weixin".8000L),
        PayEvent("order_4"."weixin".9000L)
      )
      .assignAscendingTimestamps(_.eventTime)
      .keyBy(_.orderId)
      
    val processed = orders
      .connect(pays)
      .process(new MatchFunction)

    processed.getSideOutput(unmatchedOrders).print()
    processed.getSideOutput(unmatchedPays).print()
	processed.print()
    env.execute()
  }
  // The data entering the same stream must be the same key, i.e. OrderId
  class MatchFunction extends KeyedCoProcessFunction[String.OrderEvent.PayEvent.String] {
    lazy private val orderState: ValueState[OrderEvent] = getRuntimeContext.getState(new ValueStateDescriptor[OrderEvent] ("orderState".Types.of[OrderEvent]))
    lazy private val payState: ValueState[PayEvent] = getRuntimeContext.getState(new ValueStateDescriptor[PayEvent] ("payState".Types.of[PayEvent]))
    override def processElement1(value: OrderEvent,
                                 ctx: KeyedCoProcessFunction[String.OrderEvent.PayEvent.String] #Context,
                                 out: Collector[String) :Unit = {
      // Find data from payState. If yes, the match is successful
      val pay = payState.value()
      if(pay ! =null) {
        payState.clear()
        out.collect("Order ID is" + pay.orderId + "Reconciliation of the two streams was successful!")}else {
        // If the data does not exist, the corresponding pay data may not come and need to be saved in the status waiting mode
        // Set a timer for 5min to match. If no match is found, a warning is issued indicating that the match fails
        orderState.update(value)
        ctx.timerService().registerEventTimeTimer(value.eventTime + 5000)}}override def processElement2(value: PayEvent,
                                 ctx: KeyedCoProcessFunction[String.OrderEvent.PayEvent.String] #Context,
                                 out: Collector[String) :Unit = {
      val order = orderState.value()
      if(order ! =null) {
        orderState.clear()
        out.collect("Order ID is" + order.orderId + "Reconciliation of the two streams was successful!")}else {
        payState.update(value)
        ctx.timerService().registerEventTimeTimer(value.eventTime + 5000)}}override def onTimer(timestamp: Long, ctx: KeyedCoProcessFunction[String.OrderEvent.PayEvent.String] #OnTimerContext, out: Collector[String) :Unit = {
      if(orderState.value() ! =null) {
        // Send the warning message to the side output stream
        ctx.output(unmatchedOrders, S "Order ID is${orderState.value().orderId}The two streams did not reconcile successfully!")
        orderState.clear()
      }
      if(payState.value() ! =null) {
        ctx.output(unmatchedPays, S "Order ID is${payState.value().orderId}The two streams did not reconcile successfully!")
        payState.clear()
      }
    }
  }
}
Copy the code

Github

Code samples for this article have been uploaded to github: github.com/ShawnVanorG…