This is the 8th day of my participation in the August More Text Challenge. For details, see:August is more challenging

The body of the

1. The role of watermark

Watermark is used to handle out-of-order events, and proper processing of out-of-order events is usually implemented using the watermark mechanism in combination with Windows. As we know, there is a process and time in the process of stream processing from event generation to flow through source and operator. Although in most cases, the data that flows to the operator is generated in the time order of events, it cannot be ruled out that out-of-order (or late element) is generated due to network, backpressure and other reasons. However, for late element, we cannot wait indefinitely. There must be a mechanism to ensure that window must be triggered to calculate after a specific time. And this particular mechanism, the watermark.

2. Watermark solves late data

In the real-time system, due to the delay caused by various reasons, the time of sending some messages to Flink is delayed than the time of event generation. If the window is built based on event time, but for late elements, we cannot wait indefinitely, there must be a mechanism to ensure that after a certain time, the window must be triggered to calculate. And this particular mechanism, the watermark.

Watermarks are the mechanism for dealing with this problem

  1. Refer to Google’s DataFlow design.
  2. Is an indicator of the progress of event Time processing.
  3. Indicates that events earlier (older) than the watermark have arrived (there is no data lower than the watermark).
  4. Judge the window trigger calculation based on watermark.

Ordered data stream watermark:

In some cases, the data flow based on Event Time is continuous (relative to Event Time). In an ordered stream, watermark is simply a periodic marker.

Unordered data stream watermark:

In more scenarios, the data flow based on Event Time is non-continuous (relative to Event Time). In an unordered stream, the watermark is critical. She tells the operator that an earlier (older/smaller) event than the watermark has arrived. The operator can advance the internal event time to the watermark’s timestamp (which triggers the window calculation).

Watermark in a parallel stream:

Generally, watermark is generated in the source function, but it can also be generated at any stage after the source. If watermark is specified multiple times, the watermarker specified later will overwrite the previous value. Each sub task of source generates watermark independently. When the watermark passes the operator, it advances the current event time at operators, and the operators generate a new watermark for the downstream. The current event time of the multi-input operator(Union, keyBy, partition) is the minimum value of the event time of its input stream. Note: In the case of multiple degrees of parallelism, the watermark alignment will take the smallest watermark for all channels

3. How to generate watermark

Typically, an watermark should be generated immediately after receiving data from a source. However, you can also apply a simple map or filter operation after the source and regenerate the watermark.

There are two main ways to generate watermark:

  1. With Periodic Watermarks
  2. With Punctuated Watermarks

The first one can define a maximum time allowed out of order, which is more commonly used. We’ll focus on Periodic Watermarks. Here’s how to generate a Periodic watermark:

4. Watermark processes sequential data

Requirements: Define a window as 10s, and realize the delay of 10s by combining the event time of data with the watermark. The data can also be counted correctly. We push forward 10s by the eventTime of data to get the watermark of data.

package com.shockang.study.bigdata.flink.watermark

import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.{DataStream.StreamExecutionEnvironment}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import java.text.SimpleDateFormat
import scala.collection.mutable.ArrayBuffer
import scala.util.Sorting

object FlinkWaterMark2 {
  def main(args: Array[String) :Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    // Set flink data processing time to eventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)
    val tupleStream: DataStream[(String.Long)] = env.socketTextStream("node01".9000).map(x => {
      val strings: Array[String] = x.split("")
      (strings(0), strings(1).toLong)
    })

    // Register our watermark
    val waterMarkStream: DataStream[(String.Long)] = tupleStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String.Long)] {
      var currentTimemillis: Long = 0L
      var timeDiff: Long = 10000L
      val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

      Def getNext: waterMark = {}*/
      override def getCurrentWatermark: Watermark = {
        val watermark = new Watermark(currentTimemillis - timeDiff)
        watermark
      }

      // Extract data eventTime
      override def extractTimestamp(element: (String.Long), l: Long) :Long = {
        val enventTime = element._2
        currentTimemillis = Math.max(enventTime, currentTimemillis)
        val id = Thread.currentThread().getId
        println("currentThreadId:" + id + ",key:" + element._1 + ",eventtime:[" + element._2 + "|" + sdf.format(element._2) + "],currentMaxTimestamp:[" + currentTimemillis + "|" + sdf.format(currentTimemillis) + "],watermark:[" + this.getCurrentWatermark.getTimestamp + "|" + sdf.format(this.getCurrentWatermark.getTimestamp) + "]")
        enventTime
      }
    })
    waterMarkStream.keyBy(0)
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
      .apply(new MyWindowFunction2).print()
    env.execute()
  }
}


class MyWindowFunction2 extends WindowFunction[(String.Long), String.Tuple.TimeWindow] {
  override def apply(key: Tuple, window: TimeWindow, input: 可迭代[(String.Long)],
                     out: Collector[String) :Unit = {
    val keyStr = key.toString
    val arrBuf = ArrayBuffer[Long] ()val ite = input.iterator
    while (ite.hasNext) {
      val tup2 = ite.next()
      arrBuf.append(tup2._2)
    }
    val arr = arrBuf.toArray
    Sorting.quickSort(arr) // Sort the data by eventTime
    val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    val result = "The key of aggregated data is:" + keyStr + "," + "The number of pieces of data in the window is:" + arr.length + "," + "The first data in the window is:" + sdf.format(arr.head) + "," + The last piece of data in the window is: + sdf.format(arr.last) + "," + "Window start time:" + sdf.format(window.getStart) + "," + "Window end time:" + sdf.format(window.getEnd) + "!!!!! When you see this, you know that the window is running."
    out.collect(result)
  }
}
Copy the code

Input test data

Note: If you want to trigger a flink window call, two conditions must be met: waterMarkTime > eventTime 2: there is data in the window

Data entry test:

Every ten seconds, Our program divides the time into the following intervals of 2019-10-01 10:11:00 to 2019-10-01 10:11:10 2019-10-01 10:11:10 to 2019-10-01 10:11:20 2019-10-01 2019-10-01 10:11:40 2019-10-01 10:11:40 2019-10-01 10:11:40 2019-10-01 10:11:40 2019-10-01 10:11:50 2019-10-01 10:11:50 2019-10-01 10:11:40 2019-10-01 10:11:50 2019-10-01 10:11:50 2019-10-01 10:11:40 2019-10-01 10:11:40 2019-10-01 10:11:50 2019-10-01 10:11:50 Cialis 10-01 10:11:50 to cialis 10-01 10:12:00 The triggering condition of data calculation is based on the two first waterMark time greater than the eventTime of data. There is data in the second window. Our waterMark directly uses the maximum eventTime minus 10 seconds for 0001 1569895882000. Cialis 10-01 10:11:12 0001 1569895885000 data eventTime: 2019-10-01 10:11:15 0001 1569895888000 2019-10-01 10:11:15 waterMark = = 2019-10-01 10:11:15 0001 1569895888000 2019-10-01 10:11:28 data waterMark is 2019-10-01 10:11:18 0001 1569895890000 data eventTime is: 2019-10-01 10:11:20 0001 1569895891000 Data eventTime: 2019-10-01 10:11:21 0001 1569895895000 data eventTime: 2019-10-01 10:11:25 0001 1569895898000 2019-10-01 10:11:25 0001 1569895898000 2019-10-01 10:11:28 0001 1569895900000 2019-10-01 10:11:28 0001 1569895900000 2019-10-01 10:11:40 Data waterMark is 2019-10-01 10:11:30 trigger the calculation of the first to the third data. 2019-10-01 10:11:30 this data is 0001 1569895911000 2019-10-01 10:11:51 Data waterMark is 2019-10-01 10:11:41. Triggers the calculation of the amount of data between 2019-10-01 10:11:20 and 2019-10-01 10:11:28. Does not trigger the calculation of 2019-10-01 10:11:30Copy the code

5. Watermark processes out-of-order data

Enter the test data and then continue to enter the following out-of-order data to verify whether the flink out-of-order data problem can be solved

Out-of-order data 0001 1569895948000 Data eventTime: Cialis 10-01 10:12:18 0001 1569895945000 data eventTime: Cialis 10-01 10:12:18 0001 1569895947000 data eventTime: Cialis 10-01 10:12:18 0001 1569895950000 data eventTime: Cialis 10-01 10:12:20 0001 1569895960000 data eventTime: 2019-10-01 10:12:40 data waterMark is triggered to calculate waterMark > eventTime and there is data in the window. 2019-10-01 10:12:28 to 2019-10-01 10:12:27 trigger the calculation of the three data packets. Do not trigger the calculation of 2019-10-01 10:12:30 2019-10-01 10:12:29 Data waterMark is 2019-10-01 10:12:30 too much late data, Flink directly discarded, you can set flink to save these too late data, easy to troubleshoot problemsCopy the code

6. How to solve the data later than watermark

If we set the data watermark to a certain amount of time after the eventtime of each data, for example, the eventtime of data is 2019-08-20 15:30:30, and the window window of the program is 10s, Then we set the watermark as 2019-08-20 15:30:40, so what should we do if the eventtime of a data is 2019-08-20 15:30:32 and the time to arrive at flink is 2019-08-20 15:30:45? What should I do if this data is 5S later than the watermark time in the window? For data that is later than watermark, Flink has three ways to process it

1. Discard directly

= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = 2019-10-01 10:12:28 Data is directly discarded 0001 1569895945000 Data eventTime: 2019-10-01 10:12:25 Data is directly discarded Note: Window is not triggered at this time. Because the window in which the data was entered has already been executed, Flink’s default solution for late data is to discard it.

2, allowedLateness specifies the time to allow data delay

In some cases, we want to provide a more forgiving time for late data. Flink provides the allowedLateness method to set a delay time for late data. Data arriving within the specified delay time can still trigger window execution.

Modify the code:

waterMarkStream
  .keyBy(0)
  .window(TumblingEventTimeWindows.of(Time.seconds(3))).allowedLateness(Time.seconds(2))// Allow data to be late for 2 seconds
//function: (K, W, Iterable[T], Collector[R]) => Unit
.apply(new MyWindowFunction).print()
Copy the code

Verify data lateness: Input data:

Restart our program after changing the code and reenter the previous data

0001 1569895882000
0001 1569895885000
0001 1569895888000
0001 1569895890000
0001 1569895891000
0001 1569895895000
0001 1569895898000
0001 1569895900000
0001 1569895911000
0001 1569895948000
0001 1569895945000
0001 1569895947000
0001 1569895950000
0001 1569895960000
0001 1569895949000
Copy the code

Verify the delay of data: define data to be delayed only 2 seconds before data is rereceived and recalculated

0001 1569895948000 Data eventTime is: 2019-10-01 10:12:28 Trigger data calculation data waterMark is 2019-10-01 10:12:30 0001 1569895945000 data eventTime: 2019-10-01 10:12:25 Trigger data calculation data waterMark was 2019-10-01 10:12:30Copy the code
0001 1569895958000 Data eventTime is: 2019-10-01 10:12:38 will not trigger the calculation of data. The data waterMark is 2019-10-01 10:12:30 waterMarkTime < eventTime, so the calculation will not be triggeredCopy the code

Adjust the data waterMark to 41 seconds to trigger the calculation of the above data

0001 1569895971000 Data eventTime is: 2019-10-01 10:12:51 Data waterMark is: 2019-10-01 10:12:41Copy the code

The calculation for 0001 1569895958000 will continue

3, sideOutputLateData collect late data

Through sideOutputLateData, late data can be collected and stored in a unified manner to facilitate troubleshooting.

You need to adjust the code first:

package com.shockang.study.bigdata.flink.watermark

import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.{DataStream.OutputTag.StreamExecutionEnvironment}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import java.text.SimpleDateFormat
import scala.collection.mutable.ArrayBuffer
import scala.util.Sorting


object FlinkWaterMark {
  def main(args: Array[String) :Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    // Set the time type to eventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // For the time being, define parallelism to be 1
    env.setParallelism(1)
    val text = env.socketTextStream("node01".9000)
    val inputMap: DataStream[(String.Long)] = text.map(line => {
      val arr = line.split("")
      (arr(0), arr(1).toLong)
    })

    // Register our data to waterMark
    val waterMarkStream: DataStream[(String.Long)] = inputMap
      .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[(String.Long)] {
        var currentMaxTimestamp = 0L

        // Watermark is delayed by 10 seconds based on eventTime, allowing a maximum out-of-order time of 10s
        val waterMarkDiff: Long = 10000L

        val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

        // Get the next watermark
        override def checkAndGetNextWatermark(t: (String.Long), l: Long) :Watermark = {
          val watermark = new Watermark(currentMaxTimestamp - waterMarkDiff)
          watermark
        }

        // Extract the current data time as eventTime
        override def extractTimestamp(element: (String.Long), l: Long) :Long = {
          val eventTime = element._2
          currentMaxTimestamp = Math.max(eventTime, currentMaxTimestamp)
          val id = Thread.currentThread().getId
          println("currentThreadId:" + id + ",key:" + element._1 + ",eventtime:[" + element._2 + "|" + sdf.format(element._2) + "],currentMaxTimestamp:[" + currentMaxTimestamp + "|" + sdf.format(currentMaxTimestamp) + "],watermark:[" + this.checkAndGetNextWatermark(element, l).getTimestamp + "|" + sdf.format(this.checkAndGetNextWatermark(element, l).getTimestamp) + "]")
          eventTime
        }
      })


    val outputTag: OutputTag[(String.Long)] = new OutputTag[(String.Long(a)]"late_data")
    val outputWindow: DataStream[String] = waterMarkStream
      .keyBy(0)
      .window(TumblingEventTimeWindows.of(Time.seconds(3)))
      AllowedLateness (time.seconds (2))// Allow data to be late for 2 seconds
      .sideOutputLateData(outputTag)
      //function: (K, W, Iterable[T], Collector[R]) => Unit
      .apply(new MyWindowFunction)


    val sideOuptut: DataStream[(String.Long)] = outputWindow.getSideOutput(outputTag)

    sideOuptut.print()
    outputWindow.print()

    // Execute the program
    env.execute()

  }
}

class MyWindowFunction extends WindowFunction[(String.Long), String.Tuple.TimeWindow] {
  override def apply(key: Tuple, window: TimeWindow, input: 可迭代[(String.Long)], out: Collector[String) :Unit = {
    val keyStr = key.toString
    val arrBuf = ArrayBuffer[Long] ()val ite = input.iterator
    while (ite.hasNext) {
      val tup2 = ite.next()
      arrBuf.append(tup2._2)
    }
    val arr = arrBuf.toArray
    Sorting.quickSort(arr)
    val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    val result = keyStr + "," + arr.length + "," + sdf.format(arr.head) + "," + sdf.format(arr.last) + "," + sdf.format(window.getStart) + "," + sdf.format(window.getEnd)
    out.collect(result)
  }
}
Copy the code

Let’s put in some data and validate the input

0001 1569895882000 0001 1569895885000 0001 1569895888000 0001 1569895890000 0001 1569895891000 0001 1569895895000 0001 1569895898000 0001 1569895900000 0001 1569895911000 0001 1569895948000 0001 1569895945000 0001 1569895947000 0001 1569895950000 0001 1569895960000 0001 1569895949000 Two late data entries will be collectedCopy the code

In this case, the late data is saved to the outputTag through sideOutputLateData.

7. Watermark mechanism with multiple degrees of parallelism

In the previous code, the parallelism was set to 1

env.setParallelism(1);
Copy the code

If not set here, the code will default to read the number of native cpus set to parallelism when running. Comment out the parallelism code

//env.setParallelism(1)
Copy the code

Then precede the output with the thread ID

The following data will appear: Enter the following lines:Output:Window is not triggered.

Because at this point, all 7 pieces of data are being processed by different threads. Each thread has a watermark.

Because in the case of multiple degrees of parallelism, the watermark alignment will take the smallest watermark for all channels but we now have 8 degrees of parallelism by default, these 7 are all processed by different threads, so we haven’t got the smallest watermark yet, So window cannot be triggered.Let’s verify this by adjusting the parallelism in the code to 2.

env.setParallelism(2)
Copy the code

Enter the following:

0001 1569895890000
0001 1569895903000
0001 1569895908000
Copy the code

Output:When the third piece of data is entered, [10:11:30,10:11:33] the window is triggered.

After the first two data inputs, the minimum watermark obtained was 10:11:20, at which time there was no data in the corresponding window.

After the third data entry, the minimum watermark obtained was 10:11:33, and the corresponding window at this time was [10:11:30,10:11:33), so it was triggered.