1. Window Overview

  • In general, real streams are unbounded. To deal with such unbounded data, we generally divide these infinite data streams into finite data sets for business processing.
  • A window is a way of splitting an infinite stream into a finite stream by distributing the stream data to finite buckets for analysis.

1. The type of the window

  1. Time window
  • Scrolling time window
  • Sliding time window
  • The session window
  1. Count window
  • Scroll counting window
  • Sliding counting window

2. Scroll

Scroll window diagram

  1. The data is segmented according to the fixed window length.
  2. Time is aligned, window length is fixed, and data does not overlap.
  3. Application scenario: Suitable for BI statistics, etc. (aggregate calculation for each time period)

3. Slide Windows

Sliding window diagram

  1. Sliding window is a more generalized form of fixed window. Sliding window consists of fixed window length and sliding interval
  2. Window length is fixed and data can overlap
  3. Application scenario: Collects statistics within the latest period (calculate the failure rate of an interface within the latest 5 minutes to determine whether to report an alarm).

4. The Session window

Sketch of session window

  1. It consists of a series of events combined with a specified timeout interval, that is, a new window will be generated if no new data has been received for a period of time
  2. Features: Unaligned time

5. window API

  1. Tumbling Time Windows

.timeWindow(Time size)

// Scroll window source code definition
public WindowedStream<T.KEY.TimeWindow> timeWindow(Time size) {
    if (environment.getStreamTimeCharacteristic() == 
            TimeCharacteristic.ProcessingTime) {
        // ProcessingTime Event time
        // Processing logic with time semantics
        return window(TumblingProcessingTimeWindows.of(size));
    } else {
        // No time semantics
        return window(TumblingEventTimeWindows.of(size)); }}Copy the code
  1. Sliding time window

.timeWindow(Time size, Time slide)

public WindowedStream<T.KEY.TimeWindow> timeWindow(Time size, Time slide) {
    if (environment.getStreamTimeCharacteristic() == 
                TimeCharacteristic.ProcessingTime) {
        return window(SlidingProcessingTimeWindows.of(size, slide));
    } else {
        return window(SlidingEventTimeWindows.of(size, slide)); }}Copy the code
  1. Session Window

.window(EventTimeSessionWindows.withGap(Time.seconds(10)))

  1. Scroll to count

.countWindow(size: Long)

def countWindow(size: Long) :WindowedStream[T.K.GlobalWindow] = {new WindowedStream(javaStream.countWin
    dow(size))
}
Copy the code
  1. Slide the count

.countWindow(size: Long, slide: Long)

def countWindow(size: Long, slide: Long) :WindowedStream[T.K.GlobalWindow] = {
    new WindowedStream(javaStream.countWindow(size, slide))
}
Copy the code

6. Window functions

Window function defines the computation operations to be performed on the data collected in the window, which can be divided into two categories:

1. Incremental aggregate functions

Every piece of data comes in, it is calculated to keep a simple state (saving memory space)

Commonly used incremental aggregate functions

  • ReduceFunction
  • AggregateFunction
/ / AggregateFunction definition
// * @param 
      
        The type of the values that are aggregated (input values)
      
// * @param <ACC> The type of the accumulator (intermediate aggregate state).
// * @param <OUT> The type of the aggregated result
AggregateFunction<IN.ACC.OUT>
Copy the code

The instance

import com.atguigu.day2.{SensorReading.SensorSource}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object AvgTempByAggregateFunction {
  def main(args: Array[String) :Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream = env.addSource(new SensorSource)

    stream
      .keyBy(_.id)
      .timeWindow(Time.seconds(5))
      .aggregate(new AvgTempAgg)
      .print()

    env.execute()
  }

  //(one key, one window) corresponds to one accumulator
  // The first generic: the type of the element in the stream
  // Second generic: type of accumulator, tuple (sensor ID, how many temperature readings come, what is the sum of the temperature readings coming)
  // Third generic: incremental aggregate function output type, tuple (sensor ID, window temperature average)
  class AvgTempAgg extends AggregateFunction[SensorReading, (String.Long.Double), (String.Double)] {
    // Create an empty accumulator
    override def createAccumulator() : (String.Long.Double) = ("".0L, 0.0)

    // What is the aggregation logic
    override def add(value: SensorReading, accumulator: (String.Long.Double)) : (String.Long.Double) = {
      (value.id, accumulator._2 + 1, accumulator._3 + value.temperature)
    }

    // What is the output when the window is closed?
    override def getResult(accumulator: (String.Long.Double)) : (String.Double) = {
      (accumulator._1, accumulator._3 / accumulator._2)
    }

    // What is the logic of merging two accumulators?
    override def merge(a: (String.Long.Double), b: (String.Long.Double)) : (String.Long.Double) = {
      (a._1, a._2 + b._2, a._3 + b._3)
    }
  }
}
Copy the code

2. Full window functions

  • ProcessWindowFunction

Collect all the data of the window first, wait until the calculation time will traverse all the data

The source code to define

  /* @tparam IN The type of the input value. * @tparam OUT The type of the output value. * @tparam KEY The type of the key. * @tparam W The type of the window. */
 ProcessWindowFunction[IN.OUT.KEY.W< :Window]
Copy the code

The instance

import com.atguigu.day2.{SensorReading.SensorSource}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object AvgTempByProcessWindowFunction {

  case class AvgInfo(id: String, avgTemp: Double, windowStart: Long, windowEnd: Long)

  def main(args: Array[String) :Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream = env.addSource(new SensorSource)

    stream
      .keyBy(_.id)
      .timeWindow(Time.seconds(5))
      .process(new AvgTempFunc)
      .print()

    env.execute()
  }

  // Compared to incremental aggregate functions, the disadvantage is that all elements in the window are saved
  // The incremental aggregate function only needs to save an accumulator
  // The advantage is that the full window aggregation function can access window information
  class AvgTempFunc extends ProcessWindowFunction[SensorReading.AvgInfo.String.TimeWindow] {
    // called when the window is closed
    override def process(key: String, context: Context, elements: 可迭代[SensorReading], out: Collector[AvgInfo) :Unit = {
      val count = elements.size // How many temperature bars are there when the window is closed
      var sum = 0.0 // Total temperature
      for (r <- elements) {
        sum += r.temperature
      }
      // The unit is ms
      val windowStart = context.window.getStart
      val windowEnd = context.window.getEnd
      out.collect(AvgInfo(key, sum / count, windowStart, windowEnd))
    }
  }
}
Copy the code

3. Incremental aggregation with full window

Go straight to the example

import com.atguigu.day2.{SensorReading.SensorSource}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object AvgTempByAggAndProcWindow {

  case class AvgInfo(id: String, avgTemp: Double, windowStart: Long, windowEnd: Long)

  def main(args: Array[String) :Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream = env.addSource(new SensorSource)

    stream
      .keyBy(_.id)
      .timeWindow(Time.seconds(5))
      .aggregate(new AvgTempAgg.new WindowResult)
      .print()

    env.execute()
  }

  //(one key, one window) corresponds to one accumulator
  // The first generic: the type of the element in the stream
  // Second generic: type of accumulator, tuple (sensor ID, how many temperature readings come, what is the sum of the temperature readings coming)
  // Third generic: incremental aggregate function output type, tuple (sensor ID, window temperature average)
  class AvgTempAgg extends AggregateFunction[SensorReading, (String.Long.Double), (String.Double)] {
    // Create an empty accumulator
    override def createAccumulator() : (String.Long.Double) = ("".0L, 0.0)

    // What is the aggregation logic
    override def add(value: SensorReading, accumulator: (String.Long.Double)) : (String.Long.Double) = {
      (value.id, accumulator._2 + 1, accumulator._3 + value.temperature)
    }

    // What is the output when the window is closed?
    override def getResult(accumulator: (String.Long.Double)) : (String.Double) = {
      (accumulator._1, accumulator._3 / accumulator._2)
    }

    // What is the logic of merging two accumulators?
    override def merge(a: (String.Long.Double), b: (String.Long.Double)) : (String.Long.Double) = {
      (a._1, a._2 + b._2, a._3 + b._3)
    }
  }

  / / attention! The input generic is the output generic of the incremental aggregate function, carrying window information
  class WindowResult extends ProcessWindowFunction[(String.Double), AvgInfo.String.TimeWindow] {
    override def process(key: String, context: Context, elements: 可迭代[(String.Double)], out: Collector[AvgInfo) :Unit = {
      // There is only one value in the iterator, which is the aggregate result sent by the incremental aggregate function!
      out.collect(AvgInfo(key, elements.head._2, context.window.getStart, context.window.getEnd))
    }
  }
}
Copy the code

7. Other window apis