In Apache Flink, Window operation is a very core abstraction in streaming data processing. It divides an infinite streaming data set into bounded Windows (or buckets), and then it is very convenient to define various computing operations on Windows. In this paper, we mainly based on Apache Flink version 1.7.2, explain the basic concept of Keyed Window and non-Keyed Window, and then analyze the related class design of Windows function and WindowAllFunction respectively. Finally, it is applied through programming practice.

The basic concept

Flink divides Windows into two categories, one is called Keyed Window and the other is called non-keyed Window. To illustrate the difference between the two types of Windows, let’s take a look at the structure of the code written based on the two types of Windows provided on the Flink website.

Programming based on Keyed Window, the basic structure of user code is as follows:

stream
       .keyBy(...)               <-  keyed versus Non-Keyed windows
       .window(...)              <-  required: "assigner"[.trigger(...)]  <- optional:"trigger" (elsedefault trigger) [.evictor(...)]  <- optional:"evictor" (elseno evictor) [.allowedLateness(...)]  <- optional:"lateness" (elsezero) [.sideOutputLateData(...)]  <- optional:"output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"[.getSideOutput(...)]  <- optional:"output tag"
Copy the code

Programming based on Non-Keyed Window, the basic structure of user code is shown as follows:

stream
       .windowAll(...)           <-  required: "assigner"[.trigger(...)]  <- optional:"trigger" (elsedefault trigger) [.evictor(...)]  <- optional:"evictor" (elseno evictor) [.allowedLateness(...)]  <- optional:"lateness" (elsezero) [.sideOutputLateData(...)]  <- optional:"output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"[.getSideOutput(...)]  <- optional:"output tag"
Copy the code

The difference between the two programming constructs above is:

  • In terms of programming apis,Keyed WindowProgramming structure can be directly on the inputstreamIn accordance with theKeyOperation, inputstreamIn the recognitionKey, that is, the inputstreamWhich part of each data element is used asKeyTo associate with the data element, so that thestreamData elements in theKeyPerform relevant calculation operations, such askeyBy, can be based onKeyGroup (The sameKeyYou can always be in the same group). If the inputstreamWithout a Key, such as a log message, it cannot be executedkeyByOperation. And forNon-Keyed WindowProgramming structure, regardless of the inputstreamWhat structure it has (e.g., whether it hasKey), it is considered unstructured and cannot be carried onkeyByOperation, and if usedNon-Keyed WindowThe function operates on thestreamGrouping (how exactly does the grouping depend on what we chooseWindowAssigner, it is responsible for willstreamEach data element in theWindowIn), assigned to one or moreWindow, and then later applied to thatstreamAll of the above calculations are correctWindowOperates on these data elements in.
  • Now, mathematically,Keyed WindowThe programming structure will take the inputstreamConverted toKeyed stream, logicallyCorresponding to multipleKeyed stream, eachKeyed streamIt’s going to compute independently, which makes itmultipleTaskCan beWindowingOperations are processed in parallelHave the same asKeyIs sent to the same data elementTaskFor processing. And forNon-Keyed WindowProgramming structure,Non-Keyed streamLogically notsplitInto multiplestream.All of theWindowingOperation logic can only be in oneTaskIs processed in, that is, the degree of parallelism is 1.

In actual programming, we can see that DataStream’s API also has the corresponding methods timeWindow() and timeWindowAll()(which is actually just an extra step of encapsulation by Flink), They also correspond to Keyed Window and non-keyed Window respectively.

WindowFunction and AllWindowFunction

In Flink, after Windowing the input stream, assign the arriving data element to the specified Window, either based on EventTime/ProcessingTime, or on Count. Or mixed EventTime/ProcessingTime/Count, for grouping data elements. Function provided by Flink is required to operate on the assigned Window. Function provided by Flink is required to operate on the assigned Window. Windows function and AllWindowFunction are respectively provided based on Keyed Window and non-Keyed Window. By implementing specific Window functions, it can access window-related metadata to meet the needs of practical applications. Now, let’s look at the corresponding inheritance hierarchy from the perspective of class design:

Keyed Window corresponds to WindowFunction

The class diagram of WindowFunction corresponding to Keyed Window is as follows:

In general, if we want to customize the processing logic that handles the data elements in the Window, or access the metadata corresponding to the Window, we can do so by inheriting from the ProcessWindowFunction class. Let’s look at the corresponding class declaration for ProcessWindowFunction:

ProcessAllWindowFunction

/**
 * Base abstract class for functions that are evaluated over non-keyed windows using a context
 * for retrieving extra information.
 *
 * @param <IN> The type of the input value.
 * @param <OUT> The type of the output value.
 * @param <W> The type of {@code Window} that this window function can be applied on.
 */
@PublicEvolving
public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> extends AbstractRichFunction {
Copy the code

ProcessWindowFunction

/**
 * Base abstract class for functions that are evaluated over keyed (grouped) windows using a context
 * for retrieving extra information.
 *
 * @param <IN> The type of the input value.
 * @param <OUT> The type of the output value.
 * @param <KEY> The type of the key.
 * @param <W> The type of {@code Window} that this window function can be applied on.
 */
@PublicEvolving
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {
Copy the code

In contrast to ProcessWindowFunction, the generic parameter of ProcessAllWindowFunction does not contain the KEY used to track the Window, because the non-keyed Window is only processed in one Task. The other OUT and W are the same as the ProcessWindowFunction class above and are no longer involved.

Inheriting from ProcessAllWindowFunction, the methods need to be implemented as follows:

	/**
	 * Evaluates the window and outputs none or several elements.
	 *
	 * @param context The context in which the window is being evaluated.
	 * @param elements The elements in the window being evaluated.
	 * @param out A collector for emitting elements.
	 *
	 * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
	 */
	public abstract void process(Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
Copy the code

The ProcessAllWindowFunction function in the original input stream, all the data elements after Windowing, can through the method of processing, in this method the specific processing logic and ProcessWindowFunction. The process is similar ().

Programming practice

Now, let’s simulate a scenario: Some App developers need to promote the App from multiple channels (Channel), need through the logs to analysis the corresponding user behavior (installation, open, browse, click, buy, closed, unloading), we assume that the real-time (near) real-time statistical analysis of each time period (e.g., every 5 seconds) from different sources of the user’s behavior. First, create a SourceFunction that simulates the generated data, as shown below:

class SimulatedEventSource extends RichParallelSourceFunction[(String, String)] {
 
  val LOG = LoggerFactory.getLogger(classOf[SimulatedEventSource])
  @volatile private var running = true
  val channelSet = Seq("a"."b"."c"."d")
  val behaviorTypes = Seq(
    "INSTALL"."OPEN"."BROWSE"."CLICK"."PURCHASE"."CLOSE"."UNINSTALL")
  val rand = Random
 
  override def run(ctx: SourceContext[(String, String)]): Unit = {
    val numElements = Long.MaxValue
    var count = 0L
 
    while (running && count < numElements) {
      val channel = channelSet(rand.nextInt(channelSet.size))
      val event = generateEvent()
      LOG.info("Event: " + event)
      val ts = event(0).toLong
      ctx.collectWithTimestamp((channel, event.mkString("\t")), ts)
      count += 1
      TimeUnit.MILLISECONDS.sleep(5L)
    }
  }
 
  private def generateEvent(): Seq[String] = {
    val dt = readableDate
    val id = UUID.randomUUID().toString
    val behaviorType = behaviorTypes(rand.nextInt(behaviorTypes.size))
    // (ts, readableDT, id, behaviorType)
    Seq(dt._1.toString, dt._2, id, behaviorType)
  }
 
  private def readableDate = {
    val df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    val ts = System.nanoTime
    val dt = new Date(ts)
    (ts, df.format(dt))
  }
 
  override def cancel(): Unit = running = false
}
Copy the code

With this data source, we can build Flink Streaming application based on the SimulatedEventSource. In the following, programming practice is also carried out for Keyed Window and non-Keyed Window respectively, and their differences are compared.

Keyed Windows programming

Sliding Window (WindowAssigner) is used to generate a Window on the stream. The size of the Window is 5s, and silde is 1s. That is, each Window calculates the data elements within 5s. Each 1s starts a Window (looking at the value of each parameter specified on the command line that submitted the Flink program). At the same time, based on the above custom implementation SimulatedEventSource as the input data source, create Flink stream, and then you can perform various operations on the stream.

When processing stream data, we want to get the start time and end time of each Window, then output grouped statistics based on Window (start time + end time), Channel, and behavior type, and finally write the result data to the specified Kafka topic in real time.

The Flink program class we implemented is SlidingWindowAnalytics, and the code is as follows:

def main(args: Array[String]): Unit = {
  val params = ParameterTool.fromArgs(args)
  checkParams(params)
  val windowSizeMillis = params.getRequired("window-size-millis").toLong
  val windowSlideMillis = params.getRequired("window-slide-millis").toLong
 
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
  val stream: DataStream[(String, String)] = env.addSource(new SimulatedEventSource)
 
  // create a Kafka producer forX val kafkaProducer = New FlinkKafkaProducer09(Params. GetRequired ("window-result-topic"),
    new SimpleStringSchema, params.getProperties
  )
 
  stream
    .map(t => {
      val channel = t._1
      val eventFields = t._2.split("\t")
      val behaviorType = eventFields(3)
      ((channel, behaviorType), 1L)
    })
    .keyBy(0)
    .timeWindow(Time.of(windowSizeMillis, MILLISECONDS), Time.of(windowSlideMillis, MILLISECONDS))
    .process(new MyReduceWindowFunction)
    .map(t => {
      val key = t._1
      val count = t._2
      val windowStartTime = key._1
      val windowEndTime = key._2
      val channel = key._3
      val behaviorType = key._4
      Seq(windowStartTime, windowEndTime, channel, behaviorType, count).mkString("\t")
    })
    .addSink(kafkaProducer)
 
  env.execute(getClass.getSimpleName)
}
Copy the code

First, a map operation is performed on the input stream to process the output ((channel, behavior type), count). Secondly, a keyBy operation is performed based on the result, and Key is specified as (channel, behavior type), and multiple Keyed streams are obtained. Next, apply the Sliding Window operation to each Keyed stream and set the Sliding Window size and slide value. Then, because we want to get the start time and end time of Window, we need to do a ProcessWindowFunction operation on the stream after Windowing. This is our custom implementation to get the start time and end time of Window. And then perform groupBy statistics on Windowing data, and output information including Window start time and end time, channel, behavior type and statistics. The corresponding implementation class is MyReduceWindowFunction, the code is shown as follows:

class MyReduceWindowFunction
  extends ProcessWindowFunction[((String, String), Long), ((String, String, String, String), Long), Tuple, TimeWindow] {
 
  override def process(key: Tuple, context: Context,
                        elements: Iterable[((String, String), Long)],
                        collector: Collector[((String, String, String, String), Long)]): Unit = {
    val startTs = context.window.getStart
    val endTs = context.window.getEnd
 
    for(group <- elements.groupBy(_._1)) {
      val myKey = group._1
      val myValue = group._2
      var count = 0L
      for(elem <- myValue) {
        count += elem._2
      }
      val channel = myKey._1
      val behaviorType = myKey._2
      val outputKey = (formatTs(startTs), formatTs(endTs), channel, behaviorType)
      collector.collect((outputKey, count))
    }
  }
 
  private def formatTs(ts: Long) = {
    val df = new SimpleDateFormat("yyyyMMddHHmmss")
    df.format(new Date(ts))
  }
}
Copy the code

The values of the generic parameters corresponding to ProcessWindowFunction above are: IN=((String, String), Long), OUT=((String, String, String, String), Long), KEY=Tuple, W=TimeWindow, This can be understood by referring to the types of the individual parameters in the process() method. In the above code, there may be multiple values of the same Key in elements, but the data elements with the same Key must be in the same Window. Then the data in each group are summarized and counted, and the output is (Window start time, Window end time, channel, behavior type) and cumulative value). You can then call the Process method on the stream and pass in an example of the MyReduceWindowFunction implementation as a parameter value. Finally, the result is formatted with a map operation and the output is saved to Kafka. Run the Flink program we implemented above and execute the following command:

bin/flink run --parallelism 2 --class org.shirdrn.flink.windowing.SlidingWindowAnalytics Jar \ --window-result-topic windowed-result-topic \ --zookeeper.connect 172.16.117.63:2181172.16. 117.64:2181172.16. 117.65:2181 \ -- the bootstrap. The servers ali-bj01-tst-cluster-002.xiweiai.cn:9092,ali-bj01-tst-cluster-003.xiweiai.cn:9092,ali-bj01-tst-cluster-004.xiweiai.cn:90 92 \ --window-size-millis 5000 \ --window-slide-millis 1000Copy the code

After a Job is submitted for running, you can view the Job running status using the Flink Web Dashboard. You can view the resulting data in Kafka, as shown in the following example:

20180106174726 20180106174731 b CLOSE 69 20180106174726 20180106174731 b UNINSTALL 86 20180106174726 20180106174731 a CLICK 64 20180106174726 20180106174731 a PURCHASE 72 20180106174727 20180106174732 b BROWSE 61 20180106174727 20180106174732 d INSTALL 67 20180106174727 20180106174732 c CLICK 74 20180106174727 20180106174732 c INSTALL 61 20180106174727 20180106174732 c PURCHASE 66 20180106174728 20180106174733 c CLICK 79 20180106174728 20180106174733 a BROWSE 58 20180106174728 20180106174733 a UNINSTALL 73 20180106174728 20180106174733 c OPEN 68 20180106174728 20180106174733 d INSTALL 55 20180106174728 20180106174733 b INSTALL 60 20180106174728 20180106174733 c PURCHASE 64 20180106174728 20180106174733 b PURCHASE 78 20180106174728 20180106174733 d UNINSTALL 58 20180106174728 20180106174733 d  BROWSE 69Copy the code

As can be seen from the result, the Sliding Window is used to assign the Window. As time goes by, each Window overlaps, which is exactly the result we originally wanted.

Non – Keyed Windows programming

Tumbling Window (WindowAssigner) creates a non-keyed Window on the stream. Tumbling Windows are also referred to as Fixed Time Windows. Each Window has the same length of Time and does not overlap.

We also want to obtain the corresponding start time and end time of each Window, so we need to implement a ProcessWindowAllFunction, but since it is a non-keyed Window, There is only one Task responsible for assigning the Window to all the data elements in the input stream, which doesn’t feel much of a difference in the programming implementation. Implementation for TumblingWindowAllAnalytics Flink program, code as shown below:

object TumblingWindowAllAnalytics {
  var MAX_LAGGED_TIME = 5000L
 
  def checkParams(params: ParameterTool) = {
    if (params.getNumberOfParameters < 5) {
      println("Missing parameters! \n"
        + "Usage: Windowing "
        + "--window-result-topic <windowed_result_topic> "
        + "--bootstrap.servers <kafka_brokers> "
        + "--zookeeper.connect <zk_quorum> "
        + "--window-all-lagged-millis <window_all_lagged_millis> "
        + "--window-all-size-millis <window_all_size_millis>")
      System.exit(-1)
    }
  }
 
  def main(args: Array[String]): Unit = {
    val params = ParameterTool.fromArgs(args)
    checkParams(params)
    MAX_LAGGED_TIME = params.getLong("window-all-lagged-millis", MAX_LAGGED_TIME)
    val windowAllSizeMillis = params.getRequired("window-all-size-millis").toLong
 
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    val stream: DataStream[(String, String)] = env.addSource(new SimulatedEventSource)
 
    // create a Kafka producer forX val kafkaProducer = New FlinkKafkaProducer09(Params. GetRequired ("window-result-topic"),
      new SimpleStringSchema, params.getProperties
    )
 
    stream
      .map(t => {
        val channel = t._1
        val eventFields = t._2.split("\t")
        val ts = eventFields(0).toLong
        val behaviorType = eventFields(3)
        (ts, channel, behaviorType)
      })
      .assignTimestampsAndWatermarks(new TimestampExtractor(MAX_LAGGED_TIME))
      .map(t => (t._2, t._3))
      .timeWindowAll(Time.milliseconds(windowAllSizeMillis))
      .process(new MyReduceWindowAllFunction())
      .map(t => {
        val key = t._1
        val count = t._2
        val windowStartTime = key._1
        val windowEndTime = key._2
        val channel = key._3
        val behaviorType = key._4
        Seq(windowStartTime, windowEndTime,
          channel, behaviorType, count).mkString("\t")
      })
      .addSink(kafkaProducer)
 
    env.execute(getClass.getSimpleName)
  }
 
  class TimestampExtractor(val maxLaggedTime: Long)
    extends AssignerWithPeriodicWatermarks[(Long, String, String)] with Serializable {
 
    var currentWatermarkTs = 0L
 
    override def getCurrentWatermark: Watermark = {
      if(currentWatermarkTs <= 0) {
        new Watermark(Long.MinValue)
      } else {
        new Watermark(currentWatermarkTs - maxLaggedTime)
      }
    }
 
    override def extractTimestamp(element: (Long, String, String),
                                  previousElementTimestamp: Long): Long = {
      val ts = element._1
      Math.max(ts, currentWatermarkTs)
    }
  }
}
Copy the code

The above code, we started in the input stream processing, call the DataStream assignTimestampsAndWatermarks method assign a timestamp for each data element in the stream, The stream’s Progress is controlled by periodically generating WaterMark, which is used to extract time stamps and the implementation reference implementation class TimestampExtractor to generate WaterMark. For more information about WaterMark, see the reference links below.

In addition, we implemented the Flink ProcessWindowAllFunction abstract class and the corresponding implementation classes for MyReduceWindowAllFunction, used for processing of the data of each Window, obtain the corresponding Window start time and end time, The implementation code is as follows:

class MyReduceWindowAllFunction extends ProcessAllWindowFunction[(String, String), ((String, String, String, String), Long), TimeWindow] { override def process(context: Context, elements: Iterable[(String, String)], collector: Collector[((String, String, String, String), Long)]): Unit = { val startTs = context.window.getStart val endTs = context.window.getEnd val elems = elements.map(t => { ((t._1,  t._2), 1L) })for(group <- elems.groupBy(_._1)) {
      val myKey = group._1
      val myValue = group._2
      var count = 0L
      for(elem <- myValue) {
        count += elem._2
      }
      val channel = myKey._1
      val behaviorType = myKey._2
      val outputKey = (formatTs(startTs), formatTs(endTs), channel, behaviorType)
      collector.collect((outputKey, count))
    }
  }
 
  private def formatTs(ts: Long) = {
    val df = new SimpleDateFormat("yyyyMMddHHmmss")
    df.format(new Date(ts))
  }
}
Copy the code

In contrast to ProcessWindowFunction in the Keyed Window implementation, there is no corresponding generic parameter KEY, because only one Task processes all the data elements of the stream input. The implementation class of ProcessAllWindowFunction processes all data elements in the Window that have not been groupBy (and cannot, because the Key of the data element is unknown). The logic is basically the same. Submit TumblingWindowAllAnalytics Flink program, perform the following command line:

bin/flink run --parallelism 2 --class org.shirdrn.flink.windowing.TumblingWindowAllAnalytics Jar \ --window-result-topic windowed-result-topic \ --zookeeper.connect 172.16.117.63:2181172.16. 117.64:2181172.16. 117.65:2181 \ -- the bootstrap. The servers ali-bj01-tst-cluster-002.xiweiai.cn:9092,ali-bj01-tst-cluster-003.xiweiai.cn:9092,ali-bj01-tst-cluster-004.xiweiai.cn:90 92 \ --window-all-lagged-millis 3000 \ --window-all-size-millis 10000Copy the code

Run successfully and you should see the output, similar to the previous one.

Reference: shiyanjun.cn