Brief introduction:Flink 1.13.0 makes the use of streaming applications as simple and natural as a normal application, and allows users to better understand the performance of streaming jobs.

Translation | Gao Yun Review | zhu Zhu, Ma Guowei

Making the address welcome to Flink thumb up to star ~

Flink 1.13 has been released! Flink 1.13 includes more than 1,000 fixes and optimizations submitted by over 200 contributors.

With this release, significant progress was made towards one of Flink’s main goals, which is to make the use of stream processing applications as simple and natural as a normal application. The new passive scale-out capacity introduced in Flink 1.13 makes scale-out as simple as any other application, with users only needing to modify the concurrency.

This release also includes a number of important changes to enable users to better understand the performance of streaming jobs. These changes allow users to better analyze the causes when stream job performance falls short of expectations. These changes include load and back pressure visualizations for identifying bottleneck nodes, CPU flame diagrams for analyzing operator hot spot code, and State access performance metrics for analyzing State Backend State.

In addition to these features, the Flink community has added a number of other optimizations, some of which we’ll discuss later in this article. We want users to enjoy the benefits of the new version and features, and at the end of this article, we’ll cover some of the changes you need to be aware of when upgrading to the Flink version.

We encourage users to download and try out the new version of Flink and report any problems they encounter via the mailing list and JIRA.

Important features

Passive expansion volume

Passive scaling is the latest development in Flink’s initial goal of making stream processing applications as simple and natural as normal applications.

When thinking about resource management and parts, there are two possible models for Flink. Users can deploy Flink applications on top of resource management systems such as K8S and YARN, and Flink takes the initiative to manage resources and allocate and release them as needed. This pattern is useful for jobs and applications that frequently change resource requirements, such as batch jobs and real-time SQL queries. In this mode, the number of workers started by Flink is determined by the degree of concurrency set by the application. In Flink we call this mode active scaling.

A more appropriate model for long-running streaming applications is for users to simply start jobs up like other long-running services, regardless of whether they are deployed on K8S, YARN, or other resource management platforms, and regardless of the number of resources that need to be applied. Instead, its size is determined by the number of workers assigned. When the number of workers changes, Flink automatically changes the application’s concurrency. In Flink we call this mode passive scaling.

While Flink’s Application Deployment pattern starts the effort to make a Flink job more like a normal Application (that is, starting a Flink job does not require two separate steps to start the cluster and submit the Application), passive scale-out accomplishes this goal: Users no longer need to use additional tools (such as scripts, K8s operators) to keep the number of workers consistent with the application concurrency Settings.

Users can now apply automatic scale-out tools to Flink applications just like normal applications, as long as they understand the cost of scale-out: stateful streaming applications need to redistribute state during scale-out.

If you want to try passive scaling, you can add scheduler-mode: reactive and start an application cluster (Standalone or K8s). See the documentation for passive scaling capacity for more details.

Analyze the performance of your application

Being able to easily analyze and understand application performance is a key feature for any application. This capability is even more important for Flink because Flink applications are typically data intensive (that is, they need to process large amounts of data) and need to deliver results in (near) real time latency.

When the Flink app can’t process data as fast as it can, or when an app takes up more resources than expected, these tools can help you figure out why.

Bottleneck detection and back pressure monitoring

The first question to be addressed in Flink performance analysis is often: Which operator is the bottleneck?

To answer this question, Flink introduces metrics that describe the degree to which the job is busy (that is, processing data) and back pressure (that the downstream operator cannot process the results in time) is unable to continue the output. Possible bottlenecks in applications are operators that are busy and upstream backpressed.

Flink 1.13 optimizes the logic for backpressure detection (using task-based Mailbox timing instead of stack sampling) and reimplements the UI presentation of job diagrams: Flink now displays the intensity of busyness and backpressure on the UI with colors and numbers.

CPU flame diagram in Web UI

Another often-answered question about performance from Flink is: which part of the calculation logic in the bottleneck operator is costly?

An effective visualization tool for this problem is the flame diagram. It can help answer the following questions:

  • Which method is currently using CPU?
  • What are the percentages of CPU used by the different methods?
  • What is the stack on which a method is called?

The flame map is constructed by repeatedly sampling the stack of threads. In the flame diagram, each method call is represented as a rectangle whose length is proportional to the number of times the method appears in the sample. An example of a flame diagram on the UI is shown below.

The documentation for the flame map includes more details and instructions to enable this feature.

STATE access delay metric

Another possible performance bottleneck is the State Backend, especially if the job’s State exceeds the memory capacity and must use the RocksDB State Backend.

This is not to say that RocksDB isn’t performing well (we love RocksDB!). , but it needs to meet some conditions to achieve the best performance. For example, users may easily encounter the problem of unintentionally using the wrong type of disk resource in the cloud that fails to meet RockDB’s IO performance requirements.

Based on the CPU flame diagram, the new State Backend latency metric can help users better determine if the performance is not as expected due to the State Backend. For example, if a user finds that a single access to RocksDB takes a few milliseconds, they need to look at the memory and I/O configuration. These indicators can be set by the state. The backend. Rocksdb. Latency – track – enabled this option to enabled. These metrics are sampled to monitor performance, so their impact on the performance of RocksDB State Backend is negligible.

Toggle State Backend with Savepoint

Users can now toggle the State Backend of a Flink application when restarting from a SavePoint. This allows Flink applications to no longer be restricted to using the State Backend selected when the application is first run.

Based on this feature, users can now start with a HashMap State Backend (a pure memory State Backend) and switch to the RocksDB State Backend if subsequent states become excessive.

At the implementation level, Flink now unifies the SavePoint format for all State Backends to achieve this functionality.

K8S is deployed using the POD mode specified by the user

Custom POD templates can now be used in native Kubernetes deployments (Flink proactively asks K8S to start POD).

Using these templates, users can set up PODs for JM and TM in a more K8S-compliant way, which is more flexible than the built-in configuration items integrated with Flink K8S.

Production of usable Unaligned Checkpoints

Unaligned Checkpoint is now production-ready and users are encouraged to try out this feature in the presence of backpressure.

Specifically, these features introduced in Flink 1.13 make Unaligned Checkpoint easier to use:

  • Users can now scale up applications when using Unaligned Checkpoint. This feature is handy if the user needs to use a Retained Checkpoint across performance reasons, so he or she cannot use a SavePoint.
  • For applications without backpressure, it is now cheaper to enable Unaligned Checkpoint. Unaligned Checkpoints can now be automatically triggered by a timeout, that is, by default an application uses Aligned Checkpoints (does not store data in transit), It automatically switches to an Unaligned Checkpoint (to store data in transit) only when alignment exceeds a certain time range.

Refer to the documentation on how to enable Unaligned Checkpoint.

Machine learning migrates to a separate repository

In order to accelerate the progress of Flink Machine Learning (Batch Stream Unified Machine Learning), Flink Machine Learning has opened a new Flink-ML repository. We adopted a management approach similar to that of the Stateful Function project, using a single repository that simplifies the process of code merging and allows for separate version releases, thus increasing development efficiency.

Users can follow Flink’s progress in machine learning, such as its interoperability with Alink (Flink’s common machine learning algorithm suite) and Flink’s integration with TensorFlow.

SQL/Table API advances

As with previous releases, the SQL and Table APIs still account for a large percentage of all development.

The time window is defined by the table-valued function

In streaming SQL queries, one of the most commonly used is to define a time window. Flink 1.13 introduced a new way of defining Windows: through the table-valued function. This approach is not only more expressive (allowing users to define new window types), but also more consistent with the SQL standard.

Flink 1.13 supports Tumble and Hop Windows in the new syntax and will also support Session Windows in future versions. Let’s demonstrate the expressive power of this approach with the following two examples:

  • Example 1: A newly introduced Cumulate window function that allows Windows to expand at a specific step size until the maximum window size is reached: Cumulate
SELECT window_time, window_start, window_end, SUM(price) AS total_price 
GROUP BY window_start, window_end, window_time;
  • Example 2: In the table-valued window function, the user can access the start and end time of the window, so that the user can realize new functions. For example, in addition to regular window-based aggregation and Join, users can now implement window-based top-k aggregation as well:
SELECT window_time, ...
  FROM (
    SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY total_price DESC) 
      as rank 
    FROM t
  ) WHERE rank <= 100; 

Improve the interoperability of the DataStream API with the Table API/SQL

This version greatly simplifies mixing the DataStream API with the Table API.

The Table API is a very convenient interface for application development because it is written by programs that support expressions and provides a large number of built-in functions. But there are times when a user needs to switch back to DataStream, such as when the user has a need for expressionability, flexibility, or State access.

Flink newly introduced StreamTableEnvironment. ToDataStream () /. FromDataStream () would declare a DataStream API Source or Sink as a Table Source or Sink. Key optimizations include:

  • Automatic conversion between DataStream and Table API type systems.
  • Seamless integration of Event Time configuration and a high degree of consistency in WaterMark behavior.
  • There have been significant enhancements to the Row type (that is, the representation of data in the Table API), including optimizations for the toString()/hashCode() and equals() methods, support for access to field values by name, and sparse representation.
Table table = tableEnv.fromDataStream( dataStream, Schema.newBuilder() .columnByMetadata("rowtime", "TIMESTAMP(3)") .watermark("rowtime", "SOURCE_WATERMARK()") .build()); DataStream<Row> dataStream = tableEnv.toDataStream(table) .keyBy(r -> r.getField("user")) .window(...) ;

SQL Client: Initialize Statement Sets

SQL Client is an easy way to run and deploy SQL streams or batch jobs directly, without the user having to write code to invoke SQL from the command line or as part of a CI/CD process.

This release greatly improves the functionality of SQL Client. Now, based on all the syntax that can be supported through Java programming (that is, programmatically calling the TableEnvironment to initiate a query), both SQL Client and SQL scripts can now support it. This means that SQL users no longer need to add glue code to deploy their SQL jobs.

Configuration simplicity and code sharing

Flink will no longer support configuring SQL Client via YAML (note: it is currently supported, but it has been marked deprecated). Instead, SQL Client now supports the use of an initialization script to configure the environment before the main SQL script executes.

These initialization scripts can often be shared across teams/deployments. It can be used to load commonly used catalogs, apply common configurations, or define standard views.

./ -i init1.sql init2.sql -f sqljob.sql

More configuration items

By adding configuration items and optimizing the SET/RESET command, users can more easily control the execution flow within the SQL Client and SQL scripts.

Support for multiple queries through statement collections

Multiple queries allow users to execute multiple SQL queries (or statements) in a Flink job. This is useful for long-running streaming SQL queries.

Statement sets can be used to combine a set of queries into a set of simultaneous executions.

The following is an example of an SQL script that can be executed through the SQL Client. It initializes and configures the environment to execute multiple queries. This script includes all the queries and all the environment initialization and configuration work so that it can be used as a self-contained deployment component.

-- set up a catalog CREATE CATALOG hive_catalog WITH ('type' = 'hive'); USE CATALOG hive_catalog; -- or use temporary objects CREATE TEMPORARY TABLE clicks ( user_id BIGINT, page_id BIGINT, viewtime TIMESTAMP ) WITH ( 'connector' = 'kafka', 'topic' = 'clicks', 'properties.bootstrap.servers' = '... ', 'format' = 'avro' ); -- set the execution mode for jobs SET execution.runtime-mode=streaming; -- set the sync/async mode for INSERT INTOs SET table.dml-sync=false; -- set the job's parallelism SET parallism.default=10; -- set the job name SET = my_flink_job; -- restore state from the specific savepoint path SET execution.savepoint.path=/tmp/flink-savepoints/savepoint-bb0dab; BEGIN STATEMENT SET; INSERT INTO pageview_pv_sink SELECT page_id, count(1) FROM clicks GROUP BY page_id; INSERT INTO pageview_uv_sink SELECT page_id, count(distinct user_id) FROM clicks GROUP BY page_id; END;

Hive query syntax compatibility

Users can now use the Hive SQL syntax on Flink as well. In addition to the Hive DDL dialects, Flink now supports the commonly used Hive DML and DQL dialects.

To use the Hive SQL dialect, set Table.SQL-Dialect to Hive and load the HiveModule. The latter is important because Hive’s built-in functions must be loaded to achieve proper Hive syntactic and semantic compatibility. Examples are as follows:

CREATE CATALOG myhive WITH ('type' = 'hive'); -- setup HiveCatalog
LOAD MODULE hive; -- setup HiveModule
USE MODULES hive,core;
SET table.sql-dialect = hive; -- enable Hive dialect
SELECT key, value FROM src CLUSTER BY key; -- run some Hive queries 

Note that the Hive dialect no longer supports Flink syntax for DML and DQL statements. If you want to use Flink syntax, you need to switch back to the dialect configuration in Default.

Optimized SQL time function

Time processing is an important task in data processing. But at the same time, dealing with different time zones, dates, and times is an increasingly complex task.

In Flink 1.13, we put a lot of effort into simplifying the use of the time function. We have tweaked the return type of time-dependent functions to be more accurate, such as PROCTIME(), CURRENT\_TIMESTAMP(), and NOW().

Second, users can now define the Event Time attribute based on a column of type TIMESTAMP\_LTZ, which gracefully supports daylight saving Time in window processing.

Users can refer to the Release Note to see the complete changes in this section.

Pyflink core optimizations

The main improvements over Pyflink in this release are to make the Python-based DataStream API and Table API more consistent with the Java/ Scala counterpart.

Stateful operators in the Python DataStream API

In Flink 1.13, Python programmers can enjoy the full power of the Flink state-handling API. The Python DataStream API, refactored in Flink 1.12, now has full state access capabilities, allowing users to log information about data into state and access it later.

Stateful processing is the basis of many complex data-processing scenarios that rely on sharing state across records, such as Window Operators.

The following example shows the implementation of a custom calculation window:

class CountWindowAverage(FlatMapFunction):
    def __init__(self, window_size):
        self.window_size = window_size

    def open(self, runtime_context: RuntimeContext):
        descriptor = ValueStateDescriptor("average", Types.TUPLE([Types.LONG(), Types.LONG()]))
        self.sum = runtime_context.get_state(descriptor)

    def flat_map(self, value):
        current_sum = self.sum.value()
        if current_sum is None:
            current_sum = (0, 0)
        # update the count
        current_sum = (current_sum[0] + 1, current_sum[1] + value[1])
        # if the count reaches window_size, emit the average and clear the state
        if current_sum[0] >= self.window_size:
            yield value[0], current_sum[1] // current_sum[0]

ds = ...  # type: DataStream
ds.key_by(lambda row: row[0]) \

User-defined Windows in the PyFlink DataStream API

In Flink 1.13, the PyFlink DataStream interface added support for user-defined Windows, and users can now use window definitions outside of standard Windows.

Since Windows are the core mechanism for handling infinite streams of data (by splitting the stream into a number of finite “buckets”), this feature greatly improves the expressiveness of the API.

Row – based operations in the PyFlink Table API

The Python Table API now supports row-based operations, such as user-defined functions on row data. This feature allows users to use non-built-in data handling functions.

An example of a Python Table API using a map() operation is as follows:

  [DataTypes.FIELD("c1", DataTypes.BIGINT()),
   DataTypes.FIELD("c2", DataTypes.STRING())]))
def increment_column(r: Row) -> Row:
  return Row(r[0] + 1, r[1])

table = ...  # type: Table
mapped_result =

In addition to map(), the API also supports flat\_map(), aggregate(), flat\_aggregate(), and other row-based operations. This brings the Python Table API functionality closer to that of the Java Table API.

The PyFlink DataStream API supports Batch execution mode

For limited streams, the PyFlink DataStream API now supports the Batch execution mode introduced in the Flink 1.12 DataStream API.

Batch execution mode simplifies operations and improves performance for finite stream processing by reusing data finiteness to bypass State backend and Checkpoint processing.

Other optimization

Based on HUGO’s Flink documentation

Flink documentation was migrated from Jekyii to Hugo. If you find any problems, please do let us know, and we look forward to seeing how users feel about the new interface.

Web UI supports historical exceptions

Flink Web UI can now show n historical exceptions that caused a job to fail, improving the debugging experience in scenarios where one exception leads to multiple subsequent exceptions. The user can find the root exception in the exception history.

Report the exception and reason for the failed Checkpoint optimization

Flink now provides statistics on failed or canceled checkpoints, making it easier for users to determine the cause of failed checkpoints without having to look at the logs.

Previous versions of Flink reported metrics (such as persisted data size, trigger time, etc.) only when Checkpoint was successful.

Provides a JDBC Sink that is “exactly once” consistent

Starting in 1.13, JDBC Sink can provide “exactly once” consistency support for databases that support XA transactions by committing data using transactions. This feature requires that the target database must have (or be linked to) an XA transaction handler.

This Sink is now available only in the DataStream API. Users can use jdbcSink. ExactlyonCesink (…) To create this Sink (or by explicitly initializing a JDBCXAsinkFunction).

The Pyflink Table API supports user-defined aggregate functions on the Group window

The PyFlink Table API now supports both Python-based user-defined Aggregate Functions (UDAFs) and Pandas UDAFs for the Group window. These functions are important for many data analysis or machine learning training programs.

Prior to Flink 1.13, these functions could only be used in unlimited group-by aggregation scenarios. Flink 1.13 optimizes this limitation.

Sort-merge Shuffle optimization in Batch execution mode

Flink 1.13 optimizes the performance and memory footprint of sort-merge Blocking Shuffle for batch applications. This Shuffle mode was introduced in the Flink 1.12 Flip-148.

This optimization avoids the constant OutOfMemoryError: Direct Memory problem for large operations and improves performance (especially on mechanical hard disks) through I/O scheduling and broadcast optimization.

The HBase connector supports asynchronous dimension table queries and query caching

HBase Lookup Table Source can now support asynchronous query mode and query caching. This greatly improves the performance of Table/SQL dimension Table joins using this Source, and in some typical cases can reduce the number of I/O requests to HBase.

In previous versions, HBase Lookup Source only supported synchronous communication, which resulted in job throughput and reduced resource utilization.

Changes required to upgrade Flink 1.13

  • Flink-21709 — The old Table & SQL API planner has been marked deprecated and will be removed in Flink 1.14. The Blink scheduler has been set as the default scheduler several releases ago and will be the only scheduler in future releases. This means that BatchTableEnvironment and DataSet API interoperability will no longer be supported. Users need to switch to a unified TableEnvironment to write stream or batch jobs.
  • Flink-22352 — The Flink community has decided to deprecate support for Apache Mesos and may remove this feature further in the future. Users are better off switching to another resource management system.
  • Flink-21935 — State. Backend. Async This configuration has been disabled because Flink is now always saving snapshots asynchronously (the previous configuration default) and there is no implementation to save snapshots synchronously.
  • Flink-17012 — The RUNNING state of the Task is divided into two steps: INITIALIZING and RUNNING. The INITIALIZING phase of the Task includes loading the state and restoring the in-flight data when unaligned checkpoint is enabled. By explicitly distinguishing between the two states, the monitoring system can better distinguish whether a task is actually working or not.
  • Flink-21698 — Direct conversions between NUMERIC and TIMESTAMP types were problematic and have now been disabled, such AS CAST(NUMERIC AS TIMESTAMP(3)). Users should use TO\_TIMESTAMP(FROM\_UNIXTIME(numeric)) instead.
  • Flink-22133 — The new Source interface has a small incompatible change, Namely SplitEnumerator. SnapshotState () method now accept a checkpoint id parameter to represent the ongoing snapshot operation id belonging to a checkpoint.
  • Flink-19463 — The old StateBackend interface was marked obsolete because it was loaded with too much semantics and was confusing. This is a pure API layer change and does not affect the application runtime. Please refer to the Job Migration Guidelines for how to upgrade an existing job.

Other resources

The binaries and code are available on the Flink website download page, and the latest PyFlink releases are available from PyPI.

Refer to the release notes if you want to upgrade to Flink 1.13. This version is compatible with the previous 1.x version on interfaces marked @public.

Users can also view the list of new release changes and updated documentation for a detailed list of changes and new features.

The original link:

Copyright notice: The content of this article is contributed by Aliyun real-name registered users. The copyright belongs to the original author. Aliyun developer community does not own the copyright and does not bear the corresponding legal liability. For specific rules, please refer to User Service Agreement of Alibaba Cloud Developer Community and Guidance on Intellectual Property Protection of Alibaba Cloud Developer Community. If you find any suspected plagiarism in the community, fill in the infringement complaint form to report, once verified, the community will immediately delete the suspected infringing content.