1. Window Overview
- In general, real streams are unbounded. To deal with such unbounded data, we generally divide these infinite data streams into finite data sets for business processing.
- A window is a way of splitting an infinite stream into a finite stream by distributing the stream data to finite buckets for analysis.
1. The type of the window
- Time window
- Scrolling time window
- Sliding time window
- The session window
- Count window
- Scroll counting window
- Sliding counting window
2. Scroll
Scroll window diagram
- The data is segmented according to the fixed window length.
- Time is aligned, window length is fixed, and data does not overlap.
- Application scenario: Suitable for BI statistics, etc. (aggregate calculation for each time period)
3. Slide Windows
Sliding window diagram
- Sliding window is a more generalized form of fixed window. Sliding window consists of fixed window length and sliding interval
- Window length is fixed and data can overlap
- Application scenario: Collects statistics within the latest period (calculate the failure rate of an interface within the latest 5 minutes to determine whether to report an alarm).
4. The Session window
Sketch of session window
- It consists of a series of events combined with a specified timeout interval, that is, a new window will be generated if no new data has been received for a period of time
- Features: Unaligned time
5. window API
- Tumbling Time Windows
.timeWindow(Time size)
// Scroll window source code definition
public WindowedStream<T.KEY.TimeWindow> timeWindow(Time size) {
if (environment.getStreamTimeCharacteristic() ==
TimeCharacteristic.ProcessingTime) {
// ProcessingTime Event time
// Processing logic with time semantics
return window(TumblingProcessingTimeWindows.of(size));
} else {
// No time semantics
return window(TumblingEventTimeWindows.of(size)); }}Copy the code
- Sliding time window
.timeWindow(Time size, Time slide)
public WindowedStream<T.KEY.TimeWindow> timeWindow(Time size, Time slide) {
if (environment.getStreamTimeCharacteristic() ==
TimeCharacteristic.ProcessingTime) {
return window(SlidingProcessingTimeWindows.of(size, slide));
} else {
return window(SlidingEventTimeWindows.of(size, slide)); }}Copy the code
- Session Window
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
- Scroll to count
.countWindow(size: Long)
def countWindow(size: Long) :WindowedStream[T.K.GlobalWindow] = {new WindowedStream(javaStream.countWin
dow(size))
}
Copy the code
- Slide the count
.countWindow(size: Long, slide: Long)
def countWindow(size: Long, slide: Long) :WindowedStream[T.K.GlobalWindow] = {
new WindowedStream(javaStream.countWindow(size, slide))
}
Copy the code
6. Window functions
Window function defines the computation operations to be performed on the data collected in the window, which can be divided into two categories:
1. Incremental aggregate functions
Every piece of data comes in, it is calculated to keep a simple state (saving memory space)
Commonly used incremental aggregate functions
- ReduceFunction
- AggregateFunction
/ / AggregateFunction definition
// * @param
The type of the values that are aggregated (input values)
// * @param <ACC> The type of the accumulator (intermediate aggregate state).
// * @param <OUT> The type of the aggregated result
AggregateFunction<IN.ACC.OUT>
Copy the code
The instance
import com.atguigu.day2.{SensorReading.SensorSource}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object AvgTempByAggregateFunction {
def main(args: Array[String) :Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.addSource(new SensorSource)
stream
.keyBy(_.id)
.timeWindow(Time.seconds(5))
.aggregate(new AvgTempAgg)
.print()
env.execute()
}
//(one key, one window) corresponds to one accumulator
// The first generic: the type of the element in the stream
// Second generic: type of accumulator, tuple (sensor ID, how many temperature readings come, what is the sum of the temperature readings coming)
// Third generic: incremental aggregate function output type, tuple (sensor ID, window temperature average)
class AvgTempAgg extends AggregateFunction[SensorReading, (String.Long.Double), (String.Double)] {
// Create an empty accumulator
override def createAccumulator() : (String.Long.Double) = ("".0L, 0.0)
// What is the aggregation logic
override def add(value: SensorReading, accumulator: (String.Long.Double)) : (String.Long.Double) = {
(value.id, accumulator._2 + 1, accumulator._3 + value.temperature)
}
// What is the output when the window is closed?
override def getResult(accumulator: (String.Long.Double)) : (String.Double) = {
(accumulator._1, accumulator._3 / accumulator._2)
}
// What is the logic of merging two accumulators?
override def merge(a: (String.Long.Double), b: (String.Long.Double)) : (String.Long.Double) = {
(a._1, a._2 + b._2, a._3 + b._3)
}
}
}
Copy the code
2. Full window functions
- ProcessWindowFunction
Collect all the data of the window first, wait until the calculation time will traverse all the data
The source code to define
/* @tparam IN The type of the input value. * @tparam OUT The type of the output value. * @tparam KEY The type of the key. * @tparam W The type of the window. */
ProcessWindowFunction[IN.OUT.KEY.W< :Window]
Copy the code
The instance
import com.atguigu.day2.{SensorReading.SensorSource}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object AvgTempByProcessWindowFunction {
case class AvgInfo(id: String, avgTemp: Double, windowStart: Long, windowEnd: Long)
def main(args: Array[String) :Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.addSource(new SensorSource)
stream
.keyBy(_.id)
.timeWindow(Time.seconds(5))
.process(new AvgTempFunc)
.print()
env.execute()
}
// Compared to incremental aggregate functions, the disadvantage is that all elements in the window are saved
// The incremental aggregate function only needs to save an accumulator
// The advantage is that the full window aggregation function can access window information
class AvgTempFunc extends ProcessWindowFunction[SensorReading.AvgInfo.String.TimeWindow] {
// called when the window is closed
override def process(key: String, context: Context, elements: 可迭代[SensorReading], out: Collector[AvgInfo) :Unit = {
val count = elements.size // How many temperature bars are there when the window is closed
var sum = 0.0 // Total temperature
for (r <- elements) {
sum += r.temperature
}
// The unit is ms
val windowStart = context.window.getStart
val windowEnd = context.window.getEnd
out.collect(AvgInfo(key, sum / count, windowStart, windowEnd))
}
}
}
Copy the code
3. Incremental aggregation with full window
Go straight to the example
import com.atguigu.day2.{SensorReading.SensorSource}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object AvgTempByAggAndProcWindow {
case class AvgInfo(id: String, avgTemp: Double, windowStart: Long, windowEnd: Long)
def main(args: Array[String) :Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.addSource(new SensorSource)
stream
.keyBy(_.id)
.timeWindow(Time.seconds(5))
.aggregate(new AvgTempAgg.new WindowResult)
.print()
env.execute()
}
//(one key, one window) corresponds to one accumulator
// The first generic: the type of the element in the stream
// Second generic: type of accumulator, tuple (sensor ID, how many temperature readings come, what is the sum of the temperature readings coming)
// Third generic: incremental aggregate function output type, tuple (sensor ID, window temperature average)
class AvgTempAgg extends AggregateFunction[SensorReading, (String.Long.Double), (String.Double)] {
// Create an empty accumulator
override def createAccumulator() : (String.Long.Double) = ("".0L, 0.0)
// What is the aggregation logic
override def add(value: SensorReading, accumulator: (String.Long.Double)) : (String.Long.Double) = {
(value.id, accumulator._2 + 1, accumulator._3 + value.temperature)
}
// What is the output when the window is closed?
override def getResult(accumulator: (String.Long.Double)) : (String.Double) = {
(accumulator._1, accumulator._3 / accumulator._2)
}
// What is the logic of merging two accumulators?
override def merge(a: (String.Long.Double), b: (String.Long.Double)) : (String.Long.Double) = {
(a._1, a._2 + b._2, a._3 + b._3)
}
}
/ / attention! The input generic is the output generic of the incremental aggregate function, carrying window information
class WindowResult extends ProcessWindowFunction[(String.Double), AvgInfo.String.TimeWindow] {
override def process(key: String, context: Context, elements: 可迭代[(String.Double)], out: Collector[AvgInfo) :Unit = {
// There is only one value in the iterator, which is the aggregate result sent by the incremental aggregate function!
out.collect(AvgInfo(key, elements.head._2, context.window.getStart, context.window.getEnd))
}
}
}
Copy the code