preface

Flink’s API can be roughly divided into three layers: ProcessFunction at the bottom, DataStream in the middle, and SQL/Table at the top, each of which is heavily dependent on time attributes. The temporal attribute is one of the most important aspects of stream processing and is one of the cornerstones of a stream processing system, spanning all three layers of apis. In the DataStream API layer, there are not many places where we can touch time for encapsulation reasons, so we’ll focus on the ProcessFunction at the bottom and the SQL/Table API at the top.




Flink time semantics

Time semantics are different in different application scenarios. As an advanced distributed stream processing engine, Flink itself supports different time semantics. Its core is Processing Time and Event Time (Row Time). The main differences between these two types of Time are shown in the following table:





Processing Time is the Time used to simulate the real world. In fact, even the local Time of the node Processing data is not necessarily the real world Time, so it is the Time used to simulate the real world. Event Time is the Time of the data world, that is, the Time in the data flow world we need to deal with. In terms of how they are obtained, Process Time is determined by the Time it takes to call the local machine directly, while Event Time is determined by the timestamp carried by each processing record.

Both of these times are different in terms of how easy it is to process them inside Flink and how easy it is to actually use them by users. Comparatively speaking, Processing Time is easier to process, while Event Time is more troublesome. When Processing Time is used, the result of Processing (or the internal state of the stream Processing application) is uncertain. Because Flink has made various guarantees for Event Time, when Event Time is used, a relatively certain reproducible result can be obtained no matter how many times data are replayed.

Therefore, when deciding whether to use Processing Time or Event Time, you can follow a principle: When your application encounters some problems and tries to replay from the previous checkpoint or SavePoint, do you want the same results? If you want the exact same result, you can only use Event Time; If you accept different results, you can use Processing Time. A common use of Processing Time is to count the throughput of the entire system according to the actual Time. For example, to calculate the number of data items processed in an hour, only Processing Time can be used.





Property of time

An important property of time is that it only increments and does not travel back and forth. Take full advantage of this feature when using time. Let’s say we have these records, and then we look at Processing Time and Event Time for Time.

  • As for Processing Time, since we use the Time of the local node (assuming that the clock synchronization of this node is not a problem), the Processing Time we get every Time must be increasing, and increasing means orderly. Therefore, what we get is an orderly data flow.
  • However, when Event Time is used, because Time is bound to each record, the Time of data may be out of order to a certain extent due to network delay, internal logic of the program, or other distributed system reasons, such as the example in the figure above. In the Event Time scenario, we call the Time contained in each Record Record Timestamp. If the time series obtained by Record Timestamp is out of order, we need to deal with this situation.




If a single piece of data is out of order, we consider a greater degree of discretization for the entire sequence. To put it simply, the data is composed into some small batches according to a certain number of items, but the small batches here are not to accumulate enough items to be processed, but to divide them in time. After this higher level of discretization, we find that the time in the far right box must be less than the time in the middle box, and the time in the middle box must be less than the time in the far left box.





At this point, we inserted some special processing data, which was called watermark, into the whole time sequence, similar to flag bits. A watermark essentially represents the timestamp value contained in the watermark, indicating that no subsequent data is less than or equal to this time.

An overview of Timestamp and Watermark behavior





Next, let’s focus on some basic information about Record Timestamp (Timestamp) and watermark in Event Time. The vast majority of distributed flow computing engines are DAG graph abstraction of data, it has its own data sources, processing operators, and some data sinks. Data flows between different logical operators. Watermark and TIMESTAMP have their own life cycle, and I’ll look at how they were created, propagated between different nodes, and processed on each node.

Timestamp allocation and Watermark generation

Flink supports two ways to create watermark. The first is generated in the SourceFunction, which is equivalent to putting the entire timestamp allocation and watermark generated logic at the source of the stream processing application. We can generate watermark in SourceFunction using these two methods:

  • The collectWithTimestamp method is used to send a data, where the first parameter is the data we want to send, and the second parameter is the corresponding timestamp of the data; You can also call the emitWatermark method to generate a watermark, indicating that no more records with a timestamp less than or equal to this value will follow.
  • In addition, sometimes we don’t want to generate timestamp or watermark in SourceFunction, or use SourceFunction that doesn’t support it. We can also specify when using DataStream API, Call the DataStream. AssignTimestampsAndWatermarks this method, can receive different timestamp and generator of the watermark.
In general, generators can be divided into two types: periodic generators; The second type is generated based on some special records encountered in the stream processing data flow.





The difference between the two is mainly in three aspects. First, periodic generation is time-driven in reality. “Periodic generation” here mainly refers to watermark (because timestamp is required for every piece of data), that is, generating logic is invoked periodically to generate a watermark. The creation of watermark based on special records is data-driven, that is, whether or not to generate watermark is not determined by the actual time, but when some special records are seen, it means that there will not be any data to be sent to the watermark. In this case, the user-implemented watermark generation method will be called after each Timestamp allocation, and the user needs to implement the generation logic of watermark in the generation method.

Note that the process of allocating timestamp and generating watermark can be specified in SourceFunction and DataStream, but it is recommended that the generated work be as close to the DataSource as possible. This makes it easier for more operators in the program logic to determine whether some data is out of order. Flink internally provides a good mechanism to ensure that these timestamps and watermarks are properly delivered to downstream nodes.

Watermark spread





The specific communication strategy basically follows these three points.

  • First, watermark propagates between operators in the form of a broadcast. For example, the upstream operator, which connects three downstream tasks, will broadcast its current received watermark downstream.
  • Second, if you receive a watermark with the value of long. MAX_VALUE in your application, it means that no more data will be sent to a part of the stream, which acts as a watermark of termination.
  • Third, for a single stream, this strategy is easier to understand, but for an operator with multiple inputs, the calculation of watermark needs to be specific. One principle is: the calculation of a single input is larger than the calculation of multiple inputs is smaller.
For example, suppose that the blue block over here represents a task of an operator, and it has three inputs, W1, W2, and W3. These three inputs can be understood as any input, and they may belong to the same stream, or they may belong to different streams. Then when calculating watermark, take their maximum value for individual inputs, because we all know that watermark should follow the principle of monotonically increasing. For multiple inputs, when it counts the watermark for the entire operator task, it takes the minimum of the three calculated watermark values. A multi-input task whose watermark is subject to the slowest input stream. This is analogous to the barrel effect, in which the water in the barrel is confined to the shortest board.

One of the features of watermark was that it was idempotent. Receiving the same watermark multiple times, or even the previous watermark, has no effect on the final value, because it is always the largest for a single input, and the smallest for the entire task.

We can also note that there is a limitation in this design, which is that it does not distinguish whether your input is a stream with multiple partitions or a JOIN from different logical streams. For different partitions of the same stream, we have no problem doing this forced clock synchronization because we initially split the stream into different parts, but each part shares the same clock. But if the operator’s task is similar to the JOIN operation, so ask you two input force synchronous clock in fact there is no reason, because it is entirely possible is to put a close to the now time data flow and data flow of a far away from the current time JOIN, this time for the flow faster, because it will slow the flow, such as So it might have to cache a lot of data in state, which is a big performance cost for the whole cluster.

ProcessFunction

Before we get into the process of watermark, let’s take a quick look at ProcessFunction, because the processing logic of Watermark in a task is divided into internal logic and external logic. The external logic is actually embodied through ProcessFunction, and if you need to use the time-dependent API provided by Flink, you can only write it in ProcessFunction.

The ProcessFunction and time-related functions have three main points:

  • The first point is that depending on the Time semantics used by your current system, you can obtain the current Record Timestamp that you are Processing, or the current Processing Time.
  • The second thing is that it can get the time of the current operator, which can be interpreted as the current watermark.
  • The third point is to implement some relatively complex functionality in ProcessFunction, allowing you to register some timers. For example, the timer was triggered when the watermark reached a certain point in time, and all the callback logic was provided by the user, involving the following three methods: RegisterEventTimeTimer, registerProcessingTimeTimer and onTimer. In the onTimer method you need to implement your own callback logic, which is triggered when the condition is met.
Is a simple application, we are doing some time correlation processing, part may need to cache data, but these data do not always go to the cache, so you need to have mechanisms that are out of date, we can through the timer to set such a time, some specific data may be of a certain point in time expired in the future, So I can delete it from the state. All of this time-related logic is done within Flink by its own Time Service.

Watermark processing





When an instance of an operator receives watermark, it first updates the current operator time, so that when the method in ProcessFunction queries the operator time, it gets the latest time. The second thing it does is it iterates through the timer queue, the timer queue that we just talked about, you can register a bunch of timers at the same time, and Flink puts those timers in a priority queue by the time they’re triggered. Step 3 Flink gets a time and walks through the timer queue, then triggers the user’s callback logic one by one. In this way, one of Flink’s tasks sends the current watermark to other task instances downstream, thus completing the propagation of the entire watermark, forming a closed loop.

Time in the Table API

Let’s take a look at the time in the Table/SQL API. To allow time to participate in Table/SQL operations, we need to place the time attribute in the Table schema in advance, so that we can use the time to complete the requirements in SQL statements or Table logical expressions.

Table specifies the time column

In fact, there was some discussion in the community about how to use Time in Table/SQL, whether to get the current Processing Time as a special UDF, or to materialize this column into the whole schema, and finally adopted the latter. We will talk about how to specify Processing Time and Event Time in Table separately.





For Processing Time, we know that there are two ways to get a Table object (or to register a Table) :

(1) A DataStream can be converted into a Table;

(2) Generate such a Table directly through TableSource;

For the first method, you can register the last column as an Processing Time by writing “column name.proctime” at the end of your existing column (f1 and F2 are the two existing columns in our example). You can use this column directly when writing queries later. If the Table is generated over TableSource, you can implement the DefinedRowtimeAttributes interface and automatically generate Processing times based on the logic you provide.

In contrast, there is a limitation when using Event Time, because Event Time is not available on demand like Processing Time. If you want to convert a Table from DataStream, you must ensure that Record Timestamp and watermark already exist in the original DataStream. If you want to generate data using TableSource, make sure that the data you want to access contains a time field of type LONG or TIMESTAMP.

Specifically, if you want to register a table from DataStream, like ProcTime, you simply add “column name.rowtime”. Note that if you want to use Processing Time, you have to make sure that the field you add is the last field in the entire schema. For Event Time, you can actually replace an existing column. Flink then automatically converts this column to the desired RowTime type. If generated over TableSource, you just need to implement the DefinedRowtimeAttributes interface. It is important to note that DataStream API does not support multiple Event Times (RowTimes) at the same Time. However, it is theoretically possible to have multiple RowTimes at the Table layer. Because the return value of the DefinedRowtimeAttributes interface is a List of RowTime descriptions, that is, multiple RowTime columns can exist at the same time, other improvements or optimizations may be made in the future.

Time column and Table operations

Once the time column is specified, some concrete actions will be involved when we actually query it. All of the operations I’ve listed here are tied to, or must be performed on, a time column. For example, windowing aggregations such as “Over windowing aggregations” and “Group by windowing aggregations” only allow you to perform such aggregations on this time column when writing SQL to provide parameters. The third is time window aggregation, where you only support the corresponding time column when writing conditions. Finally is sorting, we know that on an endless streams of data sorting is almost impossible to do, but because of the arrival of the data itself sequence has been carried out in accordance with the time attribute to sort, so if we want to prioritize if a DataStream into the Table, you can only be carried out in accordance with the time column sorting, Of course you can specify other columns as well, but the time column is a must and must come first.

Why do these operations only work on the time column? Because we can sometimes regard the incoming data stream as a table arranged in time, and any operation of the table must be completed under the premise of a sequential scan of it. Because we all know that one of the properties of data flow is transient, that once a piece of data has been processed, it’s actually not very easy to access it in the future. Of course, because Flink provides some internal state mechanisms, we can weaken this feature to some extent, but ultimately, the limit that cannot be exceeded should not be too large. Why all of this can only be done on the time column, which guarantees that the states we create internally will not grow indefinitely, is the ultimate premise.

Author: Cui Xingcan)

The original link

This article is the original content of the cloud habitat community, shall not be reproduced without permission.