The Apache Flink SQL 1.13 update provides an in-depth look at five practical updates and important improvements to Flip.

The Apache Flink community released version 1.13 in May, bringing many new changes. The article was compiled from “in-depth interpretation of Flink SQL 1.13”, which was shared by Xu Bangjiang at Flink Meetup in Beijing on May 22. The contents include:

  1. Flink SQL 1.13 overview
  2. Interpretation of Core Feature
  3. Interpretation of Important Improvements
  4. Flink SQL 1.14 Future Planning
  5. conclusion

Making the address https://github.com/apache/flink welcome to Flink thumb up to star ~

Flink SQL 1.13 Overview

Flink 1.13 is a large community release, with over 1000 issues resolved. As can be seen from the figure above, most of the issues resolved were about the Table/SQL module, with over 400 issues accounting for 37% of the total. These issues mainly revolve around five Flip, which are also introduced in this paper. They are:

Here’s a closer look at the Flip.

II. Interpretation of the core feature

1. Flip-145: Support for Window TVF

As those of you in the community should know, the internal branches of companies such as Tencent, Alibaba and Bytedance have already developed basic versions of this feature. The Flink community has also introduced TVF support and optimizations in Flink 1.13. In the following sections, we will examine this new feature in terms of Window TVF syntax, near-real-time cumulative computing scenarios, Window performance optimization, and multi-dimensional data analysis.

1.1 Window TVF syntax

Prior to version 1.13, Windows was implemented using a special SQLGrouped WindowFunction:

SELECT 
    TUMBLE_START(bidtime,INTERVAL '10' MINUTE),
  TUMBLE_END(bidtime,INTERVAL '10' MINUTE),
  TUMBLE_ROWTIME(bidtime,INTERVAL '10' MINUTE),
  SUM(price)
FROM MyTable
GROUP BY TUMBLE(bidtime,INTERVAL '10' MINUTE)

In version 1.13, we have standardized the syntax of table-valued Function:

SELECT WINDOW_start,WINDOW_end,WINDOW_time,SUM(price) 
FROM Table(TUMBLE(Table myTable,DESCRIPTOR(biztime),INTERVAL '10' MINUTE))
GROUP BY WINDOW_start,WINDOW_end

By comparing the two grammars, we can see that TVF syntax is more flexible and does not have to follow the Group By keyword, while Window TVF is based on relational algebra, which makes it more standard. When you only need to partition window scenes, you can use only TVF instead of Group By for aggregation. This makes TVF more extensible and expressive, and supports custom TVF (e.g., TVF that implements Top-N).

The example in the figure above is the use of TVF to do the partition of the scroll window, only need to divide the data into the window, no need to aggregate; If you need to aggregate later, Grop By can be done again. At the same time, for users familiar with batch SQL, this operation is quite natural, and we no longer have to bind window partitioning and aggregation together with special SQLGrouped WindowFunction as we did before 1.13.

Windows TVF now supports Tumble Window, Hop Window, Cumulate Window The Session Window is also expected to be supported in version 1.14.

1.2 the Cumulate Window

Cumulate window Cumulate window Cumulate window Cumulate window Cumulate window

  • The first window counts a range of data;
  • The second window counts the data in the first and second ranges;
  • The third window counts data from the first, second, and third ranges.

Cumulative computation is very common in business scenarios, such as the cumulative UV scenario. In the UV market curve: we counted the cumulative user UV of the day every 10 minutes.

Before version 1.13, when we needed to do this calculation, we typically wrote SQL like this:

INSERT INTO cumulative_UV SELECT date_str,MAX(time_str),COUNT(DISTINCT user_id) as UV FROM ( SELECT DATE_FORMAT (ts, '- dd yyyy - MM) as date_str, SUBSTR (DATE_FORMAT (ts,' HH: MM), 1, 4) | | '0' as time_str, user_id FROM user_behavior ) GROUP BY date_str

First, the time window fields of each record are stitched together, and then all records are aggregated according to the stitched time window fields through Group By, so as to achieve the effect of approximate cumulative calculation.

  • There are a number of drawbacks to the pre-1.13 approach. First, the aggregation operation is calculated once per record. Secondly, when the reverse data, consumption accumulation of data, the curve of the UV market will jump.
  • Cumulate Window (1.13) supports TVF. Cumulate Window (1.13) supports TVF (1.13). Cumulate Window (1.13) supports TVF (1.13). Cumulate Window (1.13) supports TVF (1.13). There is no jump even in chase data scenarios.
INSERT INTO cumulative_UV
SELECT WINDOW_end,COUNT(DISTINCT user_id) as UV
FROM Table(
    CUMULATE(Table user_behavior,DESCRIPTOR(ts),INTERVAL '10' MINUTES,INTERVAL '1' DAY))
)
GROUP BY WINDOW_start,WINDOW_end

UV market curve effect as shown in the figure below:

1.3 Window performance optimization

The Flink 1.13 community has implemented a number of performance optimizations for Windows TVF, including:

  • Memory optimization: Preallocate memory, cache window data, trigger calculation by window watermark, avoid high-frequency access to state by applying some memory buffer;
  • Slice optimization: Slice Windows and reuse computed results as much as possible, such as Hop window and Cumulate window. The calculated slicing data need not be calculated again, and only need to reuse the calculated results of slicing.
  • Operator optimization: Window operator supports local-global optimization; Support count(DISTINCT) automatic hotspot optimization;
  • Late data: support to calculate late data into subsequent slices to ensure data accuracy.

Based on these optimizations, we performed performance testing using open source Benchmark (NexMark). The results show a 2X increase in Windows universality and a better performance increase in COUNT (DISTINCT) scenarios.

1.4 Multidimensional data analysis

The standardization of the syntax brings more flexibility and extensibility, allowing users to perform multidimensional analysis directly on the window function. As shown in the figure below, the GROUPING SETS, ROLLUP and CUBE can be analyzed and calculated directly. In versions prior to 1.13, we might need to perform separate SQL aggregations of these groups and then UNION the aggregated results to achieve a similar effect. Now, this kind of multi-dimensional analysis scenario can be directly supported in Window TVF.

Support the Window Top – N

In addition to multidimensional analysis, Window TVF also supports top-n syntax, which makes it easier to write top-n on Window.

2. Flip-162: Time zones and time functions

2.1 Analysis of time zone problems

People have reported a lot of problems with time zones when using Flink SQL. There are three main reasons for time zone problems:

  • The processTime () function should take into account the time zone, but not the time zone;
  • Current \_TIMESTAMP/CURRENT\_TIME/CURRENT\_DATE/NOW() does not take time zone into account;
  • Flink’s time attribute only supports the definition of data types such as TIMESTAMP. This type is time-free. The TIMESTAMP type does not take into account the time zone, but the user wants the time to be in the local time zone.

To address the TIMESTAMP type that does not take time zone into account, we propose to support it with the TIMESTAMP\_LTZ type (TIMESTAMP\_LTZ is short for TIMESTAMP with local time zone). The table below can be used for comparison with TIMESTAMP:

TIMESTAMP\_LTZ is different from the TIMESTAMP we used before, it represents the meaning of absolute time. Through comparison, we can find that:

  • If we configure to use TIMESTAMP, it can be a string type. This value is the same whether the user is looking from a UK or Chinese time zone;
  • But in the case of TIMSTAMP\_TLZ, the source is simply a LONG value, representing the elapsed time from the origin of time. At the same time, the amount of time elapsed from the origin of time is the same in all time zones, so the Long value is the concept of absolute time. When we observe the value in different time zones, we will interpret it in the local time zone in the readable format of “year – month – day – hour – minute – second”, which is the TIMSTAMP\_TLZ type. The TIMESTAMP\_LTZ type is more suitable for users in different time zones.

The following example shows the difference between the TIMESTAMP and TIMESTAMP\_LTZ types.

2.2 Correction of time function

Correct the PROCTIME() function

When we had TIMESTAMP\_LTZ, we corrected the PROCTIME() type: Prior to 1.13, it always returned UTC TIMESTAMP; Now, we change the return type to TIMESTAMP\_LTZ. In addition to representing a function, PROCTIME can also represent a token for a time attribute.

Fixed CURRENT\_TIMESTAMP/CURRENT\_TIME/CURRENT\_DATE/NOW() function

The values of these functions can change in different time zones. For example, in the UK, UTC time is 2:00 am; But if you set the time zone to UTC+8, it’s 10 a.m. The actual time varies from time zone to time zone, as shown below:

Fix Processing Time Window time zone issues

Proctime can represent a time attribute. Proctime can represent a time attribute.

  • Prior to 1.13, if we needed to do window by day, you had to manually fix the time zone by doing some 8 hour offset and then subtract it back;
  • In FLIP-162 we solved this problem and now it is very simple for the user to declare the proctime attribute. Since the proctime () function returns TIMESTAMP\_LTZ, the result is that the local time zone is considered. The following example shows that the ProcTime attribute window is aggregated according to the local time zone in different time zones.

Revised function values in Streaming and Batch modes

In fact, the expression form of the time function will be different between the stream and batch. This revision is mainly to make it more in line with the actual usage habits of users. For example, the following function:

  • In the stream mode, it is per-record calculation, that is, each data is calculated once.
  • In Batch mode is query-start computation, which is calculated once before the job starts. For example, some of our common Batch computing engines, such as Hive, also calculate once before the start of each Batch.

2.3 Time type usage

Version 1.13 also supports Event time on a TIMESTAMP column. This means that Event time can now be defined on both TIMESTAMP and TIMESTAMP\_ LTZ columns. So as a user, what kind of scenario?

  • When the upstream source data of a job contains a string of times (e.g. 2021-4-15 14:00:00) such a scene, directly declared as TIMESTAMP and then defined the Event time on the above, the window will be calculated based on the time string segmentation, the final calculation will be in line with your actual desired results;

  • When the upstream data source’s dotting time is a LONG value, it represents an absolute time meaning. In version 1.13 you can define Event time on TIMESTAMP\_LTZ. The WINDOW aggregation defined on TIMESTAMP\_LTZ automatically solves the 8 hour time zone migration problem without the need for additional time zone changes and corrections as in the previous SQL notation.

Note: Flink SQL supports these enhancements to the time function, time zone, which are version incompatible. Users need to pay attention to whether such functions are included in the job logic when updating the version to avoid the impact on the business after upgrading.

2.4 Daylight Saving Time Support

Prior to Flink 1.13, window-related calculations were difficult for foreign Daylight Saving Time (DST) users because of the jump between Daylight Saving Time and Winter Time.

Flink 1.13 gracefully supports daylight saving time by supporting the definition of a time attribute on the TIMESTAMP\_LTZ column, while Flink SQL cleverly combines the TIMESTAMP and TIMESTAMP\_LTZ types in Window processing. This is useful for foreign daylight saving time zone users, as well as companies with overseas business scenarios.

Third, important improvement interpretation

1. Flip-152: Improved Hive syntactic compatibility

Flip-152 is a Hive syntax that supports a number of common Hive DML and DQL syntaxes, including:

Support Hive dialect syntax. The Hive dialect is used in conjunction with HiveCatalog and the Hive Module. The Hive Module provides all the built-in functions of the Hive dialect that can be accessed directly after loading.

At the same time, we can also create/drop Catalog functions and some custom functions in the Hive dialect. This greatly improves the compatibility of Flink SQL with Hive, making it easier for users familiar with Hive to use.

2. Flip-163: Improved SQL Client

Prior to version 1.13, Flink SQL Client was thought of as a peripheral gadget. However, Flip-163 has made important improvements in version 1.13:

  1. Through the -i parameter, the DDL is initialized once in advance, which is convenient to initialize multiple DDL statements of the table. It does not need to execute the command to create the table for many times, instead of creating the table in the way of YAML file.
  2. Support for -F parameter, where SQL files support DML (INSERT INTO) statements;
  3. Support for more practical configurations:

    • Verbose = true with SET sql-client. Verbose = true. Verbose prints the entire message, making it easier to track the error message than if you had just printed one sentence.
    • Streaming/Batch is supported using SET execution. Run-time mode= Streaming/Batch.
    • SET pipline.name=my\_Flink\_job;
    • Path =/ TMP/flink-savepoints /savepoint-bb0dab SET execution.
    • For multiple dependent jobs, SET table.dml-sync =true to select whether to execute asynchronously. For example, if job A runs until job B runs, SET to true to execute dependent pipeline scheduling.
  4. Statement SET syntax is also supported:

    It is possible that one of our queries will not be written to one sink, but will output to multiple sinks, such as one sink to JDBC and one sink to HBase.

    • Before version 1.13, you needed to launch two Queries to complete this job;
    • In version 1.13, we can put this into a statement and execute it as a job, enabling reuse of nodes and saving resources.

3. Flip-136: Enhanced DataStream and Table conversion

While Flink SQL greatly reduces some of the barriers to using real-time computation, the high-level encapsulation of Table/SQL also shields some of the underlying implementations, such as Timer, State, and so on. Many power users want more flexibility by being able to manipulate the DataStream directly, which requires converting between the Table and the DataStream. Flip-136 enhances the conversion between Table and DataStream, making it easier for users to switch between the two.

  • Support passing Event Time and WaterMark between DataStream and Table conversions;
Table Table = TableEnv.fromDataStream(
    dataStream,
  Schema.newBuilder()
  .columnByMetadata("rowtime","TIMESTMP(3)")
  .watermark("rowtime","SOURCE_WATERMARK()")
  .build());
)
  • ChangeLog data streams are supported to convert from Table to DataStream.
/ / DATASTREAM Table StreamTableEnvironment. FromChangelogStream (DATASTREAM the < ROW >) : Table StreamTableEnvironment.fromChangelogStream(DataStream<ROW>,Schema): The Table / / Table DATASTREAM StreamTableEnvironment. ToChangelogStream (Table) : DataStream<ROW> StreamTableEnvironment.toChangelogStream(Table,Schema): DataStream<ROW>

Flink SQL 1.14 Future Planning

Version 1.14 mainly has the following planning:

  • Delete Legacy Planner: Since Flink 1.9, after Ali’s contribution to Blink-Planner, many new features have been developed based on this Blink-Planner, while the old Legacy Planner will be completely removed.
  • Improve Window TVF: support session Window, support Window TVF allow-lateness, etc.
  • Improved Schema Handling: Whole link Schema Handling and key validation improvements
  • Enhanced Flink CDC support: Enhanced integration with upstream CDC systems, with more operators within Flink SQL to support CDC data streams.

Five, the summary

This article provides a detailed look at the core features and important improvements of Flink SQL 1.13.

  • Support Window TVF;
  • Systematically solve time zone and time function problems;
  • Improved compatibility between Hive and Flink
  • Improve SQL Client;
  • Enhanced conversion between DataStream and Table.

It also shared the community’s plans for the future of Flink SQL 1.14. I believe that students who read the article can have a better understanding of the changes of Flink SQL in this version. In practice, we can pay more attention to these new changes and changes and feel the convenience brought by them on the business level.

Copyright Notice:The content of this article is contributed by Aliyun real-name registered users, and the copyright belongs to the original author. Aliyun developer community does not own the copyright and does not bear the corresponding legal liability. For specific rules, please refer to User Service Agreement of Alibaba Cloud Developer Community and Guidance on Intellectual Property Protection of Alibaba Cloud Developer Community. If you find any suspected plagiarism in the community, fill in the infringement complaint form to report, once verified, the community will immediately delete the suspected infringing content.