0 x00 the

Watermark was a difficult concept for Flink to get around. This article will help us sort out the concept of Watermark from the perspective of the overall idea, using perceptual intuition thinking.

0 x01 problem

There are a few easy questions about Watermark

  • What are the common processing requirements/solutions for Flink flow processing applications?
  • Should Watermark be translated as Watermark or Watermark?
  • What is the nature of Watermark?
  • How does Watermark solve the problem?

Let’s answer these questions briefly to give you an idea, and we’ll go into more detail later in this article.

Q.1. What are the common requirements/solutions for Flink flow processing applications

Flink can process every incoming message, but sometimes we need to do some aggregate processing, such as how many users have clicked on our page in the last minute. So Flink introduced the window concept.

Window The window is used to retrieve data periodically. That is, the incoming raw data stream is divided into buckets, and all calculations are performed in a single bucket. Windows is a bridge from Streaming to Batch.

Problems: Aggregation class processing introduces new problems, such as out-of-order/latency. The solution is Watermark/allowLateNess/sideOutPut.

The function of Watermark is to prevent data from being out of order or not getting all data within a specified time.

AllowLateNess is a delay in window closing time.

**sideOutPut ** is a last-pocket operation that puts all expired delay data into the sideOutPut stream after the specified window has been completely closed and lets the user decide what to do with it.

So in summary that means

Windows -----> Watermark -----> allowLateNess -----> sideOutPut Uses Windows to block streaming data into cubes and use Watermark to determine when to stop waiting for earlier data/trigger Windows for calculation. Use allowLateNess to delay the window closing time a little longer. Use the sideOutPut last pocket to export the data somewhere else.Copy the code

Question 2. Watermark should be translated into Watermark

I first read an article that translated Watermark as “Watermark.” I was dizzy. Because names are supposed to say something. But I could never imagine the nature of the watermark.

As I continue to read the article, I feel more and more that this should be translated as “water mark”. “High-water mark” refers to the highest water level reached by sea or flood water.

Later, I gradually saw other articles also translated into water mark, I was relieved, finally there will not be a second “socket” such a magical translation.

Question 3. What is the nature of Watermark

Watermarks estimates whether any messages have yet to arrive based on the messages it has already collected, essentially a timestamp. Timestamps reflect when the event occurred, not when the event was processed.

This can be seen from the source of Flink, the only meaningful member variable is timestamp.

public final class Watermark extends StreamElement {
  /*The watermark that signifies end-of-event-time. */
  public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);
  /* The timestamp of the watermark in milliseconds. */
  private final long timestamp;
  /* Creates a new watermark with the given timestamp in milliseconds.*/
  public Watermarklong timestamp) {
    this.timestamp = timestamp;
  }
  /*Returns the timestamp associated with this {@link Watermark} in milliseconds.**/
  public long getTimestamp(a) {
    returntimestamp; }}Copy the code

Question 4. How does Watermark solve the problem

Watermark is a way of telling Flink how much a message is delayed. It defines when to stop waiting for earlier data.

The Watermarks can be understood as a water level, which is constantly changing. Watermark actually flows with the data flow as part of the data flow.

When the operator in Flink receives Watermarks, it knows that messages earlier than that time have fully arrived at the computing engine, assuming that no more events will arrive less than the watermark.

This assumption is the basis for triggering the window calculation. The window will not be closed until the water level crosses the corresponding end time of the window.

0x02 Background concepts

Stream processing

The essence of stream processing is that when you process data, you process data one by one.

Batch processing is the process of accumulating data to a certain extent. This is the essential difference between them.

In terms of design, Flink believes that data is streaming and batch processing is just a special case of streaming. At the same time, the data is divided into bounded data and unbounded data.

  • Bounded data corresponds to batch processing, and apis to Dateset.
  • Unbounded data corresponds to streams, and apis correspond to DataStream.

Out-of-order (out-of-order)

What is out of order? It can be understood that the order of data arrival is inconsistent with the order of its actual generation time. There are many reasons for this, such as latency, message backlogs, retries, and so on.

We know that there is a process and time between the event generation, the flow through the source, and the operator. Although in most cases, the data streamed to the operator is in the time order of event generation, out-of-order (or late element) is not excluded due to network and back pressure.

Such as:

Some data in a data source is available for some reason (for example, network reasons, external storage itself)5The delay of seconds, which is the first time in real time1Second generated data may be in the first5Data generated in seconds comes later (such as to the Window node). There are1~10An event. The out-of-order arrival sequence is:2.3.4.5.1.6.3.8.9.10.7
Copy the code

0x03 Window concept in Flink

window

For Flink, this is fine if you take one message and calculate one message, but this calculation is very frequent and resource consuming, and it is impossible to do some statistics. So window calculations are generated for both Spark and Flink.

For example, because we want to see access data for the last minute, for the last half hour, we need Windows.

Window: Windows is the key to dealing with unbounded streams. Windows splits streams into buckets of limited size, and computes on each bucket.

Start_time,end_time: indicates the time Window. Each Window has a start time and an end time. The start time and end time are system time.

Window life cycle

In short, as soon as the first element belonging to this window arrives, a window is created, and when the time (event or processing time) exceeds its end timestamp plus a user-specified allowed delay, the window is completely deleted.

Such as:

Use window strategy based on event time5Minutes creates a window that does not overlap (or roll over) and allows delay1Minutes. Let's say that at the moment12:00. When the first element with a timestamp that falls into that interval arrives, Flink will be12:00to12:05The interval between the watermark creates a new window when the watermark arrives12:06It will be deleted when timestamped.Copy the code

The window has the following components:

  • Window Assigner: Used to determine which Windows to assign elements to.
  • Trigger: indicates the Trigger. Determines when a window can be evaluated or cleared. The triggering policy might be something like “when the number of elements in the window is greater than 4”, or “when the water line ends through the window”.
  • Evictor: This removes elements from the window before and/or after the trigger fires & applies the function.

Windows also have functions, such as ProcessWindowFunction, ReduceFunction, AggregateFunction or FoldFunction. This function will contain calculations to be applied to the contents of the window, and the trigger specifies the condition under which the window is considered ready to apply the function.

Keyed vs Non-Keyed Windows

Before defining a window, the first thing to specify is whether the stream needs Keyed, using keyBy (…) Divide an unbounded stream into logical keyed streams. If keyBy (…) is not called Is not a Keyed stream.

  • For Keyed flows, any attribute of the incoming event can be used as a key. Having a Keyed stream allows window calculations to be executed in parallel by multiple tasks, since each logical Keyed stream can be processed independently of the rest of the tasks. All elements with the same Key will be sent to the same task.

  • In the case of non-keyed flow, the original flow will not be split into multiple logical flows, and all window logic will be executed by a single task, i.e. parallelism is 1.

Classification of window

Window classification can be divided into: the rolling Window (Window, Tumbling without overlap), rolling Window (Sliding Window, overlap), and the Session Window, (the Session Window, play)

Scroll window The scroll window allocator assigns each element to a window of a fixed window size. Scroll Windows are fixed in size and do not overlap. For example, if you specify a scrolling window of 5 minutes size, the current window will execute and a new window will start every 5 minutes.

The sliding window

The difference between a sliding window and a scrolling window is that a sliding window has repeated calculation parts.

The sliding window allocator assigns each element to a fixed window size window. Similar to the scroll window allocator, the window size is configured by the window size parameter. Another window sliding parameter controls how frequently a sliding window is started. Therefore, if the slide size is smaller than the window size, the slide Windows can overlap. In this case, elements are allocated to multiple Windows.

For example, you can use a window size of 10 minutes and a slide size of 5 minutes. Thus, a window is generated every 5 minutes, containing events that arrived in the last 10 minutes.

Session window The session window allocator groups elements by active sessions. In contrast to scrolling and sliding Windows, session Windows do not overlap and do not have fixed start and end times. Conversely, the session window closes when it has not received an element for a period of time.

For example, between inactivity. The session window allocator configures the gaps between sessions and defines how long is the required period of inactivity between men. When this time period expires, the current session closes and subsequent elements are assigned to the new session window.

0x04 Time concept in Flink

Flink supports different time concepts in stream handlers. Event Time/Processing Time/Ingestion Time, i.e. Event Time, Processing Time, and extraction Time.

From the perspective of time series, the sequence of occurrence is as follows:

Event Time ----> Ingestion Time ----> Processing TimeCopy the code
  • An Event Time is the Time when an Event occurs in the real world, usually described by a timestamp in the Event.
  • Ingestion Time is the Time when data enters the Apache Flink stream processing system, i.e. when Flink reads the data source.
  • Processing Time is the system Time when data flows into a specific operator (the message is calculated and processed). This is the current system time when the Flink program processes the event.

But when we explain, we will explain from the back to the front, and put the most important Event Time at the end.

The processing time

Is the corresponding system time when data flows into a specific operator.

This system time refers to the system time of the machine performing the corresponding operation. When a stream program is run through processing time, all time-based operations (e.g., time Windows) use the system time of the physical machine on which the respective operations are run.

ProcessingTime has the best performance and lowest latency. However, in distributed computing environment or asynchronous environment, ProcessingTime is uncertain, and the same data flow may be run for several times to produce different calculation results. Because it is susceptible to influences from the speed at which records arrive in the system (for example, from message queues) to the speed at which records flow between operators within the system (power outages, scheduling, or otherwise).

Extracting time

IngestionTime is the time at which data enters the Apache Flink framework, set in the Source Operator. Each record takes the current time of the source as a timestamp, and subsequent time-based operations, such as time Windows, reference that timestamp.

Extraction time is conceptually between event time and processing time. It’s a little bit earlier than the processing time. IngestionTime provides more predictable results than ProcessingTime because IngestionTime’s timestamp is stable (only recorded once at the source), so the same timestamp will be used for the same data as it flows through different window operations, For ProcessingTime, the same data will have different processing timestamps as it flows through different window operators.

In contrast to event time, the extraction time program cannot process any out-of-order events or late data, but the program does not have to specify how to generate water lines.

Internally, the extraction time is very similar to the event time, but with automatic timestamp assignment and automatic waterline generation.

Event time

Event time is when an event occurs in the real world, that is, when each event occurs on the device that generated it (local time). For example, the time of a click event is the time when the user clicks on the mobile phone or computer.

EventTime is usually embedded in a record before entering the Apache Flink framework, and it can also be extracted from the record. In actual business scenarios such as online shopping orders, EventTime is mostly used for data calculation.

The power of event-based time-based processing is that correct results can be obtained even in out-of-order events, delayed events, historical data, and duplicate data from backup or persistent logs. For event times, the progress of time depends on the data, not any clock.

The event time program must specify how to generate Watermarks for event time, which is the mechanism for representing the event time schedule.

Now suppose we are creating a sorted data stream. This means that the application processes out-of-order events arriving in the stream and generates a new data stream with the same events but sorted by timestamp (event time).

Such as:

There are 1 to 10 events. The out-of-order arriving sequences are: 1,2,4,5,6,3,8,9,10,7 the sequence processed by event time is: 1,2,3,4,5,6,7,8,9,10Copy the code

In order to process event times, Flink needs to know the event timestamp, which means that each piece of data in the stream needs to be assigned its event timestamp. This is usually done by extracting fixed fields from each piece of data.

Set time characteristic

The first part of a Flink DataStream program is usually to set the basic timing properties. This setting defines how data stream sources will behave (e.g., whether they will be assigned a timestamp) and which Time concept above should be used for window operations like ** keyedstream.timewindow (time.seconds (30)) **.

Such as:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
Copy the code

0x05 Watermark

Earlier, we talked about event time, which is the actual time that our business cares about in real-time processing. In an ideal world, event time processing will produce completely consistent and deterministic results regardless of when the event arrives or its ordering. However, in reality, messages are not sent in order and are out of order. How to deal with this situation?

Watermark was a mechanism developed by Apache Flink to handle EventTime window calculations, and was essentially a timestamp. Watermark is used to handle out-of-order events or delayed data, usually using the watermark mechanism in conjunction with Windows (Watermarks is used to trigger window calculations).

For late elements, for example, we cannot wait indefinitely. There must be a mechanism to ensure that after a certain time, the Window must be triggered to perform the calculation. And that particular mechanism was called watermark. Think of Watermark as a way of telling Flink how late a message is. Defines when to stop waiting for earlier data.

1. Window trigger conditions

Watermark + Window is the mechanism to handle data out of order. When should the window be activated?

Based on Event Time Event processing, Flink’s default Event triggering conditions are:

For out-of-order and normal data

  • Watermark timestamp > = Window endTime
  • There is data in [window_start_time,window_end_time].

For data with too many late elements

  • Event Time > Timestamp of the watermark

WaterMark is an EndLine, and once Watermarks is greater than the end_time of a window, it means that Windows with the same Windows_END_time time and WaterMark time are counted.

That is to say, we calculate Watermarks according to certain rules and set some delay to give some chance to the late data. That is to say, normally speaking, I only wait for the late data for a period of time, otherwise there will be no chance.

The WaterMark time can be the actual time of Flink system or the Event time carried by processing data.

With the Flink system real time, there are fewer problems to pay attention to in parallel and multithreading because real time is the standard.

Note the following points if the Event Time carried by processing data is used as the WaterMark time:

  • Because data doesn’t arrive sequentially, keep a current maximum timestamp as the WaterMark time
  • Parallel synchronization problem

2. Setting method of WaterMark

Punctuated Watermark

Punctuated Watermark triggers the generation of new Watermark through some special marking events in data stream. In this way, the window fires regardless of the time, but depending on when the marked event is received.

In actual production, Punctuated method will generate a large number of Watermark in the scene with high TPS, which will cause pressure on downstream operators to some extent. Therefore, only in the scene with very high real-time requirement, the Punctuated method will be selected for Watermark generation.

Periodic Watermark

Generate a Watermark periodically (allowing a certain amount of time or a certain number of records). The interval of water level elevation is set by the user. There will be some messages flowing in the interval of two water level elevation, and the user can calculate the new water level according to this part of data.

In the actual production, the Watermark must be generated periodically based on the two dimensions of time and accumulated bars. Otherwise, there would be a large delay in extreme cases.

For example, the simplest water line algorithm is to take the time of the largest event so far. However, this method is more violent and less tolerant of out-of-order events, which is prone to a large number of late events.

3. Lateness

Although the water mark indicates that events prior to it should not occur again, as mentioned above, it is inevitable to receive messages prior to the water mark, which is called a late event. In fact, late events are special cases of out-of-order events. Different from general out-of-order events, their out-of-order degree exceeds the expectation of the water line, resulting in the window being closed before they arrive.

When the late event appears, the window has been closed and the calculation result has been produced. Therefore, there are three ways to deal with it:

  • Reactivate the closed window and recalculate to correct the result.
  • Collect late events and deal with them separately.
  • Treat late events as error messages and discard them.

Flink’s default processing method is direct discarding. The other two methods use Side Output and Allowed Lateness respectively.

The Side Output mechanism can place late events separately into a data stream branch, which is a by-product of the window’s calculation results for the user to retrieve and special process.

Allowed Lateness The Allowed Lateness mechanism allows users to set a maximum allowable Lateness time. Flink will save the status of the window after the window is closed until the allowed lateness period is exceeded. Lateness events during this period will not be discarded, but will trigger window recalculation by default. Because of the extra memory required to save the window state, and because the ProcessWindowFunction API is used for window calculations, it is possible to trigger a full window calculation for each late event, so the allowed lateness should not be too long or too many late events should not be allowed. Otherwise, it should be considered to reduce the speed of water level rise or adjust the algorithm.

The summary mechanism is as follows:

  • The window is used to retrieve data periodically.

  • Watermark is a way to ensure that data doesn’t get sorted out (often) and not all of the specified data is retrieved within the time of the event.

  • AllowLateNess is a delay in window closing time.

  • SideOutPut is the last out of the bottom operation, all the expired delay data, the specified window has been completely closed, will put the data into the sideOutPut stream.

Example 4.

The system time was used to create Watermark

We set the water level to the current system time interval of -5 seconds.

override def getCurrentWatermark() :Watermark = {       
	new Watermark(System.currentTimeMillis - 5000)}Copy the code

It is usually better to keep the maximum timestamp received and create a watermark with the maximum expected delay, rather than subtracting it from the current system time.

Use Event Time as watermark

For example, the Event time-based data itself contains a timestamp field named RowTime, for example, 1543903383 (2018-12-04 14:03:03), define a rowTime column based on the policy of 3s offset watermark, The water line timestamp of this data is:

154390333-3000 = 1543900383 (2018-12-04 14:03:00)Copy the code

Water line time meaning: timestamp less than 1543900383 (2018-12-04 14:03:00), have arrived.

class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
    val maxOutOfOrderness = 3000L; // 3 seconds
    var currentMaxTimestamp: Long;
    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long) :Long = {
        val timestamp = element.getCreationTime()
        currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
        timestamp;
    }
    override def getCurrentWatermark() :Watermark = {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        new Watermark(currentMaxTimestamp - maxOutOfOrderness); }}Copy the code

See how to trigger the window

Now that we’ve seen the window trigger mechanism, here we’ve added the water mark, what’s going on here? Let’s look at the following

If we set the time window for 10s, then 0 10s and 10 20s are a window. Take 0 to 10s as an example, 0 is start-time and 10 is end-time. If there are four data whose event-time is 8(A),12.5(B),9(C), and 13.5(D) respectively, we set Watermarks as the maximum value of event-time of all current arrival data minus the delay value of 3.5 seconds

When A arrives, the Watermarks will be Max {8}-3.5=8-3.5 = 4.5 < 10, no calculation will be triggered when B arrives, Watermarks = Max (12.5,8)-3.5=12.5-3.5 = 9 < 10, does not trigger calculation when C arrives, Watermarks = Max (12.5,8,9)-3.5=12.5-3.5 = 9 < 10, does not trigger calculation when D arrives, Watermarks is Max (13.5,12.5,8,9)-3.5=13.5-3.5 = 10 = 10.

Max is the key, is the maximum event of all events in the current window.

The delay of 3.5s here means that we assume that when a data arrives, the data 3.5s earlier must also arrive, which needs to be calculated according to experience. Suppose an E arrives after D arrives,event-time=6, but since the time window 0~10 has already been calculated, E is lost.

The loss of E above shows that water level line is not a panacea. However, data can not be lost according to our own production experience + sidetrack output and other schemes.

0 x06 Flink source code

Data structure definition

In Flink DataStream, different elements flow, collectively called StreamElements, which can be any type of StreamRecord, Watermark, StreamStatus, LatencyMarker.

StreamElement

StreamElement is an abstract class (the base class from which Flink carries messages) that the other four types inherit.

public abstract class StreamElement {
  // Check whether it is Watermark
  public final boolean isWatermark(a) {
    return getClass() == Watermark.class;
  }
  // Check whether it is StreamStatus
  public final boolean isStreamStatus(a) {
    return getClass() == StreamStatus.class;
  }
  // Check whether it is a StreamRecord
  public final boolean isRecord(a) {
    return getClass() == StreamRecord.class;
  }
  // Check whether it is LatencyMarker
  public final boolean isLatencyMarker(a) {
    return getClass() == LatencyMarker.class;
  }
  // Convert to StreamRecord
  public final <E> StreamRecord<E> asRecord(a) {
    return (StreamRecord<E>) this;
  }
  // Convert to Watermark
  public final Watermark asWatermark(a) {
    return (Watermark) this;
  }
  // Convert to StreamStatus
  public final StreamStatus asStreamStatus(a) {
    return (StreamStatus) this;
  }
  // Convert to LatencyMarker
  public final LatencyMarker asLatencyMarker(a) {
    return (LatencyMarker) this; }}Copy the code

Watermark

Watermark inherits StreamElement. Watermark is an abstraction at the same level as the event and contains a member variable timestamp that identifies the progress of the current data. Watermark actually flows with the data flow as part of the data flow.

@PublicEvolving
public final class Watermark extends StreamElement {
  /*The watermark that signifies end-of-event-time. */
  public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);
  /* The timestamp of the watermark in milliseconds. */
  private final long timestamp;
  /* Creates a new watermark with the given timestamp in milliseconds.*/
  public Watermarklong timestamp) {
	this.timestamp = timestamp;
  }
  /*Returns the timestamp associated with this {@link Watermark} in milliseconds.**/
  public long getTimestamp(a) {
    returntimestamp; }}Copy the code

How does Flink generate & process Watermark

In actual use in most cases will choose periodic generation namely AssignerWithPeriodicWatermarks way.

// Specifies evenTime time semantics
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// Cycle of generating watermark
env.getConfig.setAutoWatermarkInterval(watermarkInterval)
// Specify the mode
dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Element] (Time.seconds(allowDealy)) {
   override def extractTimestamp(element: Element) :Long = element.dT
  })
Copy the code

Is BoundedOutOfOrdernessTimestampExtractor Flink built-in out-of-order maximum allowable delay watermark generation, you just need to rewrite the extractTimestamp method.

AssignTimestampsAndWatermarks can be understood as an operator transformation operations, equivalent to map/window as understanding, can set parallelism, for its name, is also a transformation/operator,

public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( AssignerWithPeriodicWatermarks
       
         timestampAndWatermarkAssigner)
        {

	// match parallelism to input, otherwise dop=1 sources could lead to some strange
	// behaviour: the watermark will creep along very slowly because the elements
	// from the source go to each extraction operator round robin.
	final int inputParallelism = getTransformation().getParallelism();
	final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);

	TimestampsAndPeriodicWatermarksOperator<T> operator =
			new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);

	return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
			.setParallelism(inputParallelism);
}
Copy the code

The type of StreamOperator TimestampsAndPeriodicWatermarksOperator, inherited AbstractUdfStreamOperator, The OneInputStreamOperator interface and ProcessingTimeCallback interface are implemented.

TimestampsAndPeriodicWatermarksOperator.

/**
 * A stream operator that extracts timestamps from stream elements and
 * generates periodic watermarks.
 *
 * @param <T> The type of the input elements
 */
public class TimestampsAndPeriodicWatermarksOperator<T>
		extends AbstractUdfStreamOperator<T.AssignerWithPeriodicWatermarks<T>>
		implements OneInputStreamOperator<T.T>, ProcessingTimeCallback {

	private static final long serialVersionUID = 1L;
	private transient long watermarkInterval;
	private transient long currentWatermark;

	public TimestampsAndPeriodicWatermarksOperator(AssignerWithPeriodicWatermarks<T> assigner) {
		super(assigner);
		this.chainingStrategy = ChainingStrategy.ALWAYS;
	}

	@Override
	public void open(a) throws Exception {
		super.open();
        // Initialize the current watermark
		currentWatermark = Long.MIN_VALUE;
        // Set the watermark interval
		watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
        // Register its configuration
		if (watermarkInterval > 0) {
			long now = getProcessingTimeService().getCurrentProcessingTime();
            // Register a timer that will be triggered after the watermarkInterval, passing in the callback argument this, that is, calling the onProcessingTime method of the current object
			getProcessingTimeService().registerTimer(now + watermarkInterval, this); }}@Override
	public void processElement(StreamRecord<T> element) throws Exception {
        // Extract the current event time
		final long newTimestamp = userFunction.extractTimestamp(element.getValue(),
				element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);
        // Save the current maximum event time.
		output.collect(element.replace(element.getValue(), newTimestamp));
	}

	@Override
	public void onProcessingTime(long timestamp) throws Exception {
        // This method represents a timed callback method that sends the appropriate watermark and registers the next timer.
		// register next timer
		Watermark newWatermark = userFunction.getCurrentWatermark();
        // When the new watermark is larger than the current watermark
		if(newWatermark ! =null && newWatermark.getTimestamp() > currentWatermark) {
			currentWatermark = newWatermark.getTimestamp();
            // Send the appropriate watermark
			// emit watermark
			output.emitWatermark(newWatermark);
		}
        // Register the next trigger time
		long now = getProcessingTimeService().getCurrentProcessingTime();
		getProcessingTimeService().registerTimer(now + watermarkInterval, this);
	}

	/**
	 * Override the base implementation to completely ignore watermarks propagated from
	 * upstream (we rely only on the {@link AssignerWithPeriodicWatermarks} to emit
	 * watermarks from here).
	 */
	@Override
	public void processWatermark(Watermark mark) throws Exception {
        The downstream watermark is only related to the way the upstream watermark was most recently generated.
		// if we receive a Long.MAX_VALUE watermark we forward it since it is used
		// to signal the end of input and to not block watermark progress downstream
		if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {
			currentWatermark = Long.MAX_VALUE;
			output.emitWatermark(mark);
		}
	}

	@Override
	public void close(a) throws Exception {
		super.close();

		// emit a final watermark
		Watermark newWatermark = userFunction.getCurrentWatermark();
		if(newWatermark ! =null && newWatermark.getTimestamp() > currentWatermark) {
			currentWatermark = newWatermark.getTimestamp();
			// emit watermarkoutput.emitWatermark(newWatermark); }}}Copy the code

How does Flink handle lateness data

Here we use the Side Output mechanism to illustrate. The Side Output mechanism can place late events separately into a data stream branch, which is a by-product of the window’s calculation results for the user to retrieve and special process.

Create a new Watermark

Flink replaces the Timestamp in the StreamRecord object, and if the generated Watermark based on the Timestamp of the current event is larger than the previous one, a new Watermark is emitted.

Specific code in TimestampsAndPunctuatedWatermarksOperator processElement.

@Override
public void processElement(StreamRecord<T> element) throws Exception {
	final T value = element.getValue();
    // Call the user-implemented extractTimestamp to get a new Timestamp
	final long newTimestamp = userFunction.extractTimestamp(value,
			element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);
    // Replace old Timestamp in StreamRecord with new Timestamp
	output.collect(element.replace(element.getValue(), newTimestamp));
    // Call the user-implemented checkAndGetNextWatermark method to get the next Watermark
	final Watermark nextWatermark = userFunction.checkAndGetNextWatermark(value, newTimestamp);
    // If the next Watermark is larger than the current one, a new Watermark is emitted
	if(nextWatermark ! =null&& nextWatermark.getTimestamp() > currentWatermark) { currentWatermark = nextWatermark.getTimestamp(); output.emitWatermark(nextWatermark); }}Copy the code

Processing lateness data

First, determine whether it is late data.

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
			for (W window: elementWindows) {
				// drop if the window is already late
                // If the window is already late, the next data is processed
				if (isWindowLate(window)) {
					continue; }}... }/**
 Returns {@code true} if the watermark is after the end timestamp plus the allowed lateness of the given window.
 */
protected boolean isWindowLate(W window) {
    // The current mechanism is the event time && the maximum timestamp of the window element + the time allowed to be late <= true when the current window element is late
	return (windowAssigner.isEventTime() && (cleanupTime(window) <= internalTimerService.currentWatermark()));
}

/**
 * Returns the cleanup time for a window, which is
 * {@code window.maxTimestamp + allowedLateness}. In
 * case this leads to a value greater than {@link Long#MAX_VALUE}
 * then a cleanup time of {@link Long#MAX_VALUE} is
 * returned.
 *
 * @param window the window whose cleanup time we are computing.
 */
private long cleanupTime(W window) {
	if (windowAssigner.isEventTime()) {
		long cleanupTime = window.maxTimestamp() + allowedLateness;
    // Return window cleanup time: the maximum timestamp of a window element + the time allowed to delay
		return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
	} else {
		returnwindow.maxTimestamp(); }}Copy the code

Secondly, late for processing the data of specific code in WindowOperator. The last paragraph of processElement method. This is the bypass output.

@Override
public void processElement(StreamRecord<IN> element) throws Exception {...// Other operations.// side output input event if element not handled by any window late arriving tag has been set
    // isSkippedElement = true if the window has not processed the data, isSkippedElement = false if the window is late
    // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
    if (isSkippedElement && isElementLate(element)) {
      if(lateDataOutputTag ! =null) {// Bypass output
          // This is what we mentioned earlier. Flink's Side Output mechanism puts late events into a separate data stream branch, which is a by-product of the window's calculation results for the user to retrieve and special process.
        sideOutput(element);
      } else {
        this.numLateRecordsDropped.inc(); }}}/** * Decide if a record is currently late, Based on current watermark and allowed lateness. * The current mechanism is event time && (element timestamp + time allowed to delay) <= current watermark *@param element The element to check
 * @return The element for which should be considered when sideoutputs
 */
protected boolean isElementLate(StreamRecord<IN> element){
	return(windowAssigner.isEventTime()) && (element.getTimestamp() + allowedLateness <= internalTimerService.currentWatermark());  }/** * To bounce lightly over late arriving element to SideOutput. /** * To bounce lightly over late arriving element to SideOutput. *@param element skipped late arriving element to side output
 */
protected void sideOutput(StreamRecord<IN> element){
    output.collect(lateDataOutputTag, element);
}
Copy the code

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.

0 XFF reference

Flink real-time, fault tolerance mechanism, window and so on

Flink System Learning 11: [Flink1.7] What is the difference between event time, processing time and extraction time

Thoroughly understand Flink system learning 10: [Flink1.7] Window life cycle, Keyed and non-Keyed and allocator interpretation

Flink easily understood Watermark

Ci.apache.org/projects/fl…

Smartsi. Club/flink – strea…

Analysis of the advantages of combining Flink Event Time and WaterMark

Flink WaterMark distributed execution understanding

Learning Flink for the first time, some understanding and comprehension of Watermarks

Introduction to WaterMark

Flink WaterMark instance

Apache Flink Ramble Series (03) – Watermark

Flink Event of the Time

Flink Stream computing programming — Introduction to Watermark

Analysis of Flink Watermark Mechanism (Thorough)

Flink Time and Watermark

How does Flink handle late data

Watermark allowedLateness() sideOutputLateData()

Flink in Watermark timing generation source code analysis