Organized by Dai Jiguo (Flink Community Volunteer)

Proofreading: Miao Wenting (Flink Community volunteer)

Abstract: This article is shared by Tencent senior engineer Du Li. It mainly introduces the optimization of Tencent real-time computing platform for Flink SQL, including:

  1. Flink SQL status quo
  2. Extension of window function
  3. Optimization of backflow
  4. Future planning

I. Background and current situation

1. Analysis of the three modes

Flink jobs are currently created in three ways: JAR mode, Canvas mode, and SQL mode. Different ways of submitting homework also target different groups of people.

S Jar model

The Jar schema is developed based on the DataStream/DataSet API and is mainly aimed at the underlying developers.

  • Advantages:

· Flexible function, because its underlying DataStream/DataSet API is Flink’s native API, you can use them to develop any operator function or DAG graph you want; · Convenient performance optimization, which can be very targeted to optimize the performance of each operator.

  • Disadvantages:

· Dependency update is cumbersome. No matter expanding job logic or upgrading Flink version, job code and dependency version have to be updated; · High learning threshold.

■ Canvas Mode

The so-called Canvas mode generally provides a visual drag-and-drop interface for users to perform drag-and-drop operations in an interface way to complete the editing of Flink jobs. It’s aimed at the white user.

  • Advantages:

· Easy to operate; various operators included in Flink’s job can be easily defined on the canvas; · Complete functions. It is developed based on Table API and has relatively complete function coverage; · Easy to understand, DAG diagram is intuitive, users can easily understand the operation process of the whole job.

  • Disadvantages:

· Complex configuration: each operator needs to be configured one by one. If the whole DAG graph is very complex, the corresponding configuration work will be very large; · Logic reuse difficulty: If there are many jobs, it is very difficult for different jobs to share DAG logic.

S SQL mode

The SQL language has been around for a long time, with its own set of standards, primarily for data analysts. Data analysts can switch between platforms and computing engines as long as they follow the existing SQL standard.

  • Advantages:

· Clear and concise, easy to understand and read; · Decoupled from computing engines. SQL is decoupled from computing engines and their versions, and migrating business logic between different computing engines requires little or no change to entire sections of SQL. At the same time, if you want to upgrade Flink version, there is no need to change SQL; · Logic reuse is convenient. We can reuse our SQL logic by creating view.

  • Disadvantages:

· Different syntax, such as stream and dimension Table Join. Lateral Table Join syntax was used before Flink 1.9, but changed to PERIOD FOR SYSTEM_TIME syntax after 1.9. This syntax follows the SQL ANSI 2011 standard. Syntax changes make users have a certain learning cost; · Incomplete function coverage: The Flink SQL module has not existed for a long time, resulting in incomplete coverage of its functions. · Difficulty in performance tuning: The execution efficiency of a section of SQL is mainly determined by several parts, one is the business logic expressed by THE SQL itself; The other part is an optimization of the execution plan generated by translating SQL; In the third part, after the optimal logical execution plan is generated, the scheme of translating the native code also determines the execution efficiency of SQL. For users, what they can optimize may be limited to the business logic expressed in SQL. · Difficulty in problem location: SQL is a complete execution process. If we find some data is wrong, it is relatively difficult to find out which operator is wrong. Generally speaking, if we want to locate Flink SQL problems, we can only continuously simplify our entire SQL logic, and then continuously try to output, which is very high cost. Tencent real-time computing platform will add trace log and metrics information to solve this problem later, and output them to the product side to help users locate problems in using Flink SQL.

2. Current work of Tencent real-time computing platform

■ Extended syntax

Window table- function syntax is defined to help users to implement window-based stream Join and Join. In addition, the implementation of its own stream and dimension table Join syntax.

■ New features

Some of the new features include two new Window types, Incremental Window and Ehanced Tumble Window. The Eventtime Field is decoupled from the Table Source. In many cases, the Eventtime Field cannot be defined by the Table Source Field. For example, if the Table Source is a sub-query or a time Field is derived from function transformation, it is currently impossible to use the generated time Field as an Eventtime Field. Our current scheme is: Allows the user to select any time field in the physical table to define the Window’s time attribute and output WaterMark.

■ Performance tuning

  • Backflow optimization;
  • Inline UDFs are called multiple times if the same UDF appears in both LogicalProject and in the Where condition. The UDF that is repeatedly invoked in the logical execution plan is extracted and the execution results of the UDF are cached to avoid repeated invocation.

S the Bucket Join

Data cold start problem exists in flow table dimension table Join. If Flink task loads a large amount of external data when starting, it is easy to cause backpressure. All data can be preloaded into memory at startup using means such as the State Processor API. However, there is a problem with this solution. Loading dimension table data into all subtasks will cause a large memory consumption. Therefore, our solution is to specify a bucket information in the definition of the dimension table. When the flow joins with the dimension table, the corresponding fragment data in the dimension table will be loaded based on the bucket information. Meanwhile, when translating the execution plan, the flow table will get the bucket information. To ensure that flow and dimension table data are joined based on the same bucket information. This method can greatly reduce the memory consumption problem caused by the preloading of full dimension table data.

Two, window function expansion

Tencent real-time computing platform has made some extensions based on the existing Flink SQL syntax, and also defined two new Window types.

1. New window operations

The following requirements exist: Join operation or Join operation for a certain time window on the two streams.

Flink SQL is used to do dual-stream Join based on a Window. There are two existing schemes: the first is to do Join first and then Group By, and the second is Interval Join. Let’s first analyze whether the first solution meets the requirements.

■ Join the window first

The logic of Join first and then Window opening is shown in the figure above. According to the logical execution plan, it can be seen that the Join node is lower than the Window Aggregate node, so the Join between streams is performed first, and then the Window Aggregate is performed after the Join.

As can be seen from the flow chart on the right of the figure, the flows on both sides will first make a Connect and then perform Keyby operation based on the Join Key to ensure that the data with the same Join Key in the two flows can be shuffled to the same task. The left stream stores the data in its own state, and matches the data in the right stream’s state. If the data can be matched, the Match result will be output to the downstream. This scheme has the following two problems:

  1. The state cannot be cleaned: Before the Window is opened, there is no Window information in the Join. Even if the downstream Window triggers and completes the calculation, the Join state of the two upstream streams cannot be cleaned. At most, the Join state can only be cleaned based on TTL.
  2. Semantics are not sufficient: The original requirement is to slice the data in the two streams based on the same time window before joining, but the current scheme cannot meet this requirement, because it joins the data first and then opens the window using the data after joining. In this way, it cannot ensure that the data joining in the two streams are based on the same window.

S 1.2 Interval to Join

Compared with the previous method, Interval Join has the advantage that there is no problem that the state cannot be cleaned, because the data of the left and right streams can be scanned based on a certain window, and the state can be cleaned after the window time.

However, compared with the first scheme, the data accuracy of this scheme may be worse, because its division of Windows is not based on a certain window, but driven by data, that is, the range of data on the other stream where the current data can Join is based on the Eventtime carried by the current data. The semantics of this window division are not what we need.

Imagine that there are two streams with inconsistent rates, and the two boundaries of low and upper are used to limit the data range of the right stream where the left stream can Join. Under such rigid range constraints, there will always be some effective data in the right stream outside the time window [left + low, left + upper]. The calculation is not accurate enough. Therefore, it is best to divide the time window by window alignment so that data with the same Eventtime in both streams falls into the same time window.

答 案 : 89

Tencent extended the Windowing table-valued Function syntax, which allows us to Join or Join data on two streams for a specific time window. This syntax is described in the SQL 2016 standard and already exists in Calcite1.23.

The Windowing table-valued Source clause contains all the information required for Window definition. Table Source, Eventtime Field, Window Size, etc.

As you can see from the logical plan above, the syntax adds a node called LogicalTableFunctionScan to LogiclTableScan. In addition, the LogicalProject node (output node) has two additional fields called WindowStart and WindowEnd, based on which data can be generalized to a specific window. Based on the above principles, the Windowing table-valued syntax can do the following:

  • On a single stream, you can partition a time Window like the existing Group Window syntax. Write it as shown above, putting all the Window information in the From clause, and then Group By. This method should be more consistent with the general understanding of the time Window, more intuitive than the current Flink SQL Group Window method. “DataStream API” “DataStream API” “DataStream API” “DataStream API” Instead, it directly transforms its logical execution plan to the current logical execution plan of the Group Window, that is, it shares the code of the underlying physical execution plan, and only makes an equivalent logical execution plan.

In addition, you can do some Sort or TopN output on data in Windows because Windowing table-function syntax has previously divided data into certain Windows. As shown in the figure above, the window is first divided in the From clause, followed By Order By and Limit, which directly express the sort and TopN semantics.

  • On the dual stream, it can meet the original requirement of “Join operation or Join difference operation for a certain time window on both streams”. The syntax is as shown in the figure above. First, construct the Window Table of the two Windows, and then use the Join keyword to carry out the Join operation. The same is true of the merge operation, which is similar to that of traditional database SQL.

■ 1.4 Implementation details

Now let’s briefly introduce some details of our implementation of Windowing table-valued Function syntax.

1.4.1 Dissemination of Windows

Original logical plan translation method, first based on LogicalTableScan, then to Windowing table-valued Function, and finally to OrderBy Limit clause. The whole process will store many times of state, which will be a big consumption for performance. Therefore, the following optimization is made to merge multiple Logical RelNodes together for translation, which can reduce the generation of intermediate code and improve performance.

1.4.2 Time Attribute Field

Windowing table-valued Function

SELECT * FROM TABLE(TUMBLE(TABLE <data>, DESCRIPTOR(<timecol>), <size> [, <offset>]))
Copy the code

A table can be more than just a table; it can also be a subquery. Therefore, if the time attribute is bound to the Table Source when the Eventtime Field is defined, and the Table Source happens to be a sub-query, it will not meet our requirements. So when we implement the syntax, we decouple the time attribute field from the Table Source. Instead, users use any time field in the physical Table as the time attribute to create watermark.

1.4.3 Time Watermarking

Watermark uses the same logic as in other syntax, the minimum time Watermark for all Input tasks of two streams determines the time Watermark of the window, which triggers the window calculation.

1.4.4 Constraints

At present, there are some restrictions on the use of Windowing table-valued Function. First, both streams must have the same window type and the same window size. However, Session Window-related functionality is not currently implemented.

2. New window types

The next section extends two new window types.

S 2.1 Incremental Window

Users want to be able to draw pv/ UV curves within a day, that is, to output multiple results within a day or within a large window, rather than wait for the end of the window to output a unified result. For this requirement, we extended the Incremental Window.

2.1.1 Multiple Triggers

Custom Incremental Trigger based on Tumble Window. This trigger ensures that window calculations are not triggered only after Windows finishes, but once for each Interval defined in SQL.

As in the SQL example above, the total window size is one second and fires every 0.2 seconds, so five window calculations are triggered within the window. And the next output result is cumulative calculation based on the last result.

2.1.2 Lazy Trigger

An optimization called Lazy Trigger was made for Incremental Windows. In the actual production process, the output result of a window with the same Key value is the same after the window calculation is triggered for many times. For downstream, there is no need to receive this data repeatedly. Therefore, if Lazy Trigger is configured and the next output value is the same as the last one under the same Key in the same window, the downstream will not receive the update data this time, thus reducing the downstream storage pressure and concurrency pressure.

■ 2.2 Enhanced Tumble Window

There is a requirement that users want to trigger Window calculations again after Tumble Window triggering instead of discarding late data. If you use the DataStream API, you can use SideOutput to fulfill the requirements. But with SQL, you can’t do that right now. Therefore, it extends the existing Tumble Window to collect data that is late, and instead of retriggering the Window calculation and exporting it downstream each time it comes in, it redefines a Trigger whose time interval uses the size of the Window defined in SQL. To reduce the frequency of sending data downstream.

At the same time, the side output stream will use Window logic to perform aggregation again when accumulating data. Note that if the downstream data source is HBase, for the same Window and Key, the data triggered by the previous Window will be overwritten by the late data. In theory, late data is just as important as data triggered by a normal window and cannot be overwritten. Finally, the downstream will perform a secondary aggregation of the received normal data and delayed data under the same Key in the same window.

3. Optimization of backflow

Next, let’s look at some of the optimizations that have been made on the fallback flow.

1. Stream table ambiguity

Review some concepts about fallback flows in Flink SQL.

A Continuous Query is an aggregation of streams that receives a single piece of data upstream and a single piece of updated data downstream, as opposed to a batch that outputs a single result at a time. Therefore, the downstream can receive multiple update results for the same Key.

2. Retracement flow

For example, when the second Java reaches the aggregation operator, it updates the state generated by the first Java and sends the results downstream. If the downstream does nothing for the results of multiple updates, it will produce incorrect results. For this scenario, Flink SQL introduces the concept of fallback flow.

The so-called rollback stream is the original data in front of an identifier, identified by True/False. If the identifier bit is False, this is a rollback message notifying the downstream to Delete the data. If the identity bit is True, the downstream Insert operation is performed directly.

■ 2.1 When does a rollback flow occur

Currently, Flink SQL has the following four scenarios for fallback flow:

  • Aggregate Without Window (Aggregate Without Window)
  • Rank
  • Over Window
  • Left/Right/Full Outer Join

Explain why Outer Join causes a fallback. Take the Left Outer Join as an example, and assume that the data of the Left stream arrives before that of the right stream, the data of the Left stream will scan the status of the data of the right stream. If the data of the right stream cannot be found, the Left stream does not know whether the data does not exist in the right stream or the corresponding data of the right stream is late. In order to satisfy the semantics of Outer JOIN, the Left stream will still generate a join data and send it to the downstream, similar to MySQL Left JOIN. The Left stream fields are filled with normal table field values, and the right stream fields are filled with Null, and then output to the downstream, as shown in the following figure:

(Image from Cloud Habitat community)

Later, if the corresponding data of the right stream arrives, it will scan the status of the left stream and Join again. At this time, in order to ensure semantic correctness, it is necessary to pull back the special data that has been output to the downstream and output the data of the latest Join to the downstream. Note that for the same Key, if there is a fallback, there is no second fallback, because if there is more data for that Key later, it can Join the corresponding data on another stream.

■ 2.2 How do I Process rollback Messages

Here is the logic for processing fallback messages in Flink.

Intermediate compute nodes are controlled by the 4 flag bits in the figure above. These flag bits indicate whether the current node generates Update information or Retract information, and whether the current node consumes the Retract information. These four identification bits determine the logic of Retract generation and processing.

For the Sink node, Flink has three Sink types: AppendStreamTableSink, RetractStreamTableSink, and UpsertStreamTableSink. AppendStreamTableSink received upstream data as a Retract message because it Only describes append-only semantics. RetractStreamTableSink processes Retract information. If the upstream operator sends a Retract message, it deletes the Retract message. If the upstream operator sends a normal update message, it inserts the Retract message. UpsertStreamTableSink Indicates that the performance of RetractStreamTableSink is optimized. If the Sink data source supports idempotency operations or supports Update operations based on a key, UpsertStreamTableSink transfers upstream Upsert keys to the TableSink during SQL translation. Then perform the Update operation based on the Key.

■ 2.3 Related optimization

We made the following optimization based on the rollback flow.

2.3.1 Optimization of intermediate nodes

One of the most fundamental reasons for fallback messages is to continuously send update results downstream multiple times, so to reduce the frequency of updates and reduce concurrency, you can accumulate some of the update results before sending them out. As shown above:

  • The first scenario is a nested AGG scenario (for example, two Count operations) where a Cache is made before the first level Group By attempts to send the update results downstream to reduce the frequency of sending data downstream. When the Cache trigger condition is reached, the update result is sent downstream.

  • The second scenario is the Outer Join. As mentioned earlier, the Outer Join produces the rollback message because the data rates on the left and right sides do not match. For example, the Left Outer Join can Cache data from the Left stream. Left-stream data will be searched in the right-stream state when it arrives. If it can find data that can be joined with it, it will not be cached. If the corresponding data cannot be found, the data of this Key will be cached first. When certain trigger conditions are reached, the data of this Key will be searched in the right-stream state again. If the corresponding data still cannot be found, a Join data containing Null value will be sent to the downstream. When the right-stream data arrives, the Cache corresponding to the Key will be cleared and a rollback message will be sent downstream.

To reduce the frequency of sending fallback messages downstream.

2.3.2 Optimization of Sink node

Some optimization is made for the Sink node, and a Cache is made between the AGG node and the Sink node to reduce the pressure of the Sink node. When the rollback messages are aggregated in the Cache, the updated data will be sent to the Sink node when the triggering condition of the Cache is reached. Take the SQL in the following figure for example:

According to the output before and after the optimization, the amount of data received by the downstream decreases after the optimization. For example, when the user Sam tries to send the rollback message to the downstream, he performs a Cache layer first, which greatly reduces the amount of data received by the downstream.

4. Future planning

Here is the follow-up work planning of our team:

  • Cost-based Optimization: At present, the logical execution plan Optimization of Flink SQL is Based on RBO (Rule Based Optimization). Our team wants to do something based on CBO, and the main thing is to collect statistics. Statistics come not only from Flink SQL itself, but also from other products within the company, such as metadata, data distribution for different keys, or other data analysis results. Get the most accurate statistical data and produce the best execution plan by connecting with other products in the company.

  • More New Features (CEP Syntax etc.) : Defines some CEP Syntax based on Flink SQL to meet some user needs about CEP.

  • Continuous Performance Optimization (Join Operator etc.) : Our team is doing not only Optimization of the execution plan layer, but also some fine-grained Optimization of the Join Operator or data Shuffle.

  • Easier To Debug: Finally, for debugging and locating Flink SQL tasks. At present, Flink SQL is relatively lacking in this aspect, especially the problem of inconsistent data pairs online, which is very difficult to investigate. Our current idea is to configure SQL to spit out some Trace information or some Metrics information during execution, and then send it to other platforms. The Trace information and Metric information are used to help users locate the faulty operator.