The author | Stephan Ewen & Johannes Moser

Translation | Song Xintong

In a recent annual report released by the Apache Software Foundation, Apache Flink once again ranked among the top 5 most active projects! The project’s latest release, version 1.14.0, also shows remarkable activity, with more than 1,000 contributions from over 200 contributors. We are proud of the community’s persistent efforts to bring this project forward.

The new release brings a number of new features and improvements in SQL APIS, more connector support, Checkpoint mechanisms, PyFlink, and more. One of the major improvements is the streaming batch integration experience. We believe that, in practice, processing of unbounded data streams is inseparable from processing of bounded batch data, as many scenarios require processing of historical data from various sources alongside real-time data streams. For example, data exploration while developing new applications, state initialization of new applications, training models for streaming applications, data reprocessing after upgrades or repairs, etc.

In Flink 1.14, we can finally mix bounded and unbounded streams in the same application: Flink now supports Checkpoint on partially running, partially finished applications (some of the operators have been processed to the end of the bounded input stream). In addition, Flink will trigger a final Checkpoint when it reaches the end of bounded data stream to ensure that all calculation results are successfully submitted to Sink.

Batch execution mode now supports mixing DataStream and SQL/Table apis in the same application (previously only DataStream or SQL/Table apis were supported alone).

We have updated the unified Source and Sink apis and have started to integrate the connector ecosystem around the unified API. We added a hybrid Source to transition between multiple storage systems. You can now do things like read old data from Amazon S3 and seamlessly switch to Apache Kafka.

In addition, this release is another step toward our goal of making Flink more self-tuning and easy to use without a lot of streaming specific knowledge. As a first step towards this goal, we introduced passive elastic scaling in the last release. Now we’ve added automatic tuning of network memory (i.e., buffer de-bloating). This feature speeds up Checkpoint at high loads while maintaining high throughput and not increasing Checkpoint size. By constantly adjusting the size of the network buffer, this mechanism can achieve the best throughput efficiency with the least buffer data. See the section on buffer debloating for more details.

There are many new features and improvements from various components in the new release, which we’ll cover below. At the same time, we said goodbye to some of the components and features that have been gradually replaced and deprecated in recent releases. Most notably, the old SQL query engine and integration with Apache Mesos have been removed.

We hope you enjoy this new version, and we’re eager to learn about your experience: what previously unsolved problems and new scenarios are addressed?

I. Integrated processing experience of flow and batch

One of Flink’s unique features is its unification of convection and batch processing: it uses the same SET of apis and a runtime that supports multiple execution paradigms.

As mentioned earlier, we believe that stream processing and batch processing are inseparable. The following quote, from a report on Facebook’s streaming data processing, nicely echoes this point.

Streaming versus batching is not an either-or choice. Initially, all of Facebook’s data warehouse processing was batch processing. We started developing Puma and Swift about five years ago. As we saw in […] As the chapter shows, mixing stream and batch processing can save hours for longer processes.

Working with real-time and historical data using the same engine also ensures semantic consistency, making results more comparable. Here is an article about Alibaba using Apache Flink to generate unified, consistent business reports.

Previous versions have been able to stream and batch data processing. The new version adds new features for more usage scenarios, as well as a series of user experience improvements.

Checkpoint mechanism with bounded flow

Flink’s Checkpoint mechanism originally supported Checkpoint creation only when all tasks in the application DAG were running. This means that it is virtually impossible for an application to read both bounded and unbounded data sources. In addition, applications that process bounded input data by streaming (rather than batching) will no longer Checkpoint when the data is about to be processed and part of the task is completed. This prevents the last part of the output data from being committed to Sink, which requires precise one-time semantics, causing business delays.

With flip-147, Flink supports the creation of a Checkpoint at the end of a partial task, as well as triggering a final Checkpoint at the end of bounded stream processing to ensure that all output results are committed to Sink at the end of the job (similar to stop-with-savepoint).

This feature can be added in the configuration execution. The checkpointing. Checkpoints – after – the tasks – finish. Enabled: true. In the tradition of letting users choose and try out major new features, this feature is not enabled by default in Flink 1.14. We hope to make this the default mode in the next release.

Background: While people generally prefer to use batch mode when working with bounded data, there are situations where streaming mode is required. For example, Sink may only support the stream mode (i.e. Kafka Sink), or the application may want to maximize the near-temporal ordering inherent in stream processing (e.g. Kappa+ architecture).

Batch execution mode of DataStream and Table/SQL mixed applications

The SQL and Table apis are becoming the default starting point for new projects, with their natural declarative nature and rich built-in types and operations that make application development easy and fast. However, it is not uncommon for developers to encounter specific, event-driven business logic where SQL’s expressiveness is inadequate (or inappropriate to force SQL to express it).

The natural thing to do at this point is to insert a piece of logic described by the stateful DataStream API and switch back to SQL.

In Flink 1.14, the SQL/Table application in bounded batch mode converts an intermediate data Table into a DataStream, which is processed by an operator defined by the DataStream API, and then converted back into a data Table. Inside, Flink builds a data-flow DAG that is a mix of optimized declarative SQL execution and DataStream batch execution. See relevant documentation for details.

Mixed Source

The new hybrid Source reads data from multiple sources in turn, switching seamlessly between them to produce a single data stream that combines data from multiple sources.

Hybrid sources target scenarios where data is read from a tiered store, equivalent to reading data from a data stream that spans all levels. For example, new data can be poured into Kafka and eventually migrated to S3 (which is usually a compressed column storage format for cost and efficiency reasons). A hybrid Source reads history data from S3 and then converts to Kafka to read the latest data as if it were a continuous logical data stream.

We believe this is an exciting step towards realizing the full promise of the log and Kappa architecture. Even if the stale portion of the event log is physically moved to a different storage (for cost, compression efficiency, read speed, etc.), you can still think of it as continuous log processing.

Flink 1.14 adds the core features of a hybrid Source. In future releases, we hope to add more tools and patterns for typical switching strategies.

Integrate Source and Sink

As the new stream batch unified Source and Sink apis became stable, we began a huge effort to integrate all connectors around these apis. At the same time, we will make DataStream better aligned with the connectors on the SQL/Table API, starting with Kafka and files Source and Sink on DataStream API.

Along with this effort, which is expected to continue for another release or two, Flink users will have a smoother, more consistent experience when connecting to external systems.

Ii. Operation and maintenance improvement

Buffer debulking

Buffer debloating is a new technique in Flink that minimizes Checkpoint latency and overhead. It minimizes the amount of data in the buffer while ensuring high throughput by automatically adjusting the amount of network memory.

Apache Flink buffers a certain amount of data in its network stack to take advantage of the high bandwidth of fast networks. Flink applications use some (or all) of the network buffer memory when running at high throughput. The aligned Checkpoint flows through the network buffer with the data in milliseconds.

When Flink applications experience (temporary) backpressure (such as external system backpressure or data skew), the network buffer tends to hold more bandwidth than required for the current throughput (reduced by backpressure) of the application. To make matters worse, more cached data means more work for Checkpoint mechanisms. An aligned Checkpoint needs to wait for more data to be processed, and an unaligned Checkpoint needs to persist more queued data.

This is where buffer de-bloating comes in. It changes the network stack from holding up to X bytes of data to holding data that takes X milliseconds of computing time to process at the receiving end. The default value is 1000 milliseconds, which means that the network stack buffers the amount of data that downstream tasks can process in 1000 milliseconds. Through continuous measurement and adjustment, the system is able to maintain this property under changing conditions. Therefore, Flink aligned Checkpoint provides stable and predictable alignment time, and the amount of data stored in non-aligned Checkpoint is greatly reduced during backpressure.

Buffer debloating can be used as a complement to, or even an alternative to, unaligned Checkpoint. Refer to the documentation on how to enable this feature.

Fine-grained resource management

Fine-grained resource management is a new advanced feature used to improve resource utilization in large shared clusters.

Flink clusters perform a wide variety of data processing workloads. Different data processing steps usually require different resources, such as computing resources, memory, and so on. For example, most mapping functions are lightweight, while larger, long-lived window functions tend to benefit from large amounts of memory. By default, Flink manages resources in coarse-grained slots, with one Slot representing a resource slice of the TaskManager. A Slot can hold one concurrent subtask instance of each operator in a streaming process, that is, a Slot can hold a concurrent subtask instance of the entire process. With Slot Sharing Group, users can influence the distribution of subtasks on slots.

With fine-grained resource management, slots on TaskManager can be dynamically resized. Conversion and operators specify required resource configurations (such as CPUS, memory, and disks). Flink’s ResourceManager and TaskManager divide resource slices of a specified size from the total resources of the TaskManager. You can think of this as a minimal, lightweight layer of resource orchestration in Flink. The following figure shows the difference between fine-grained resource management and the current default shared fixed-size Slot resource management.

You may ask why Flink is adding such a feature when it already integrates mature resource orchestration frameworks such as Kubernetes and Yarn. There are several situations where adding a layer of resource management within Flink can significantly improve resource utilization:

  • When slots are small, applying TaskManager for each Slot can be expensive (JVM overhead, Flink framework overhead, and so on). Slot Sharing solves this problem to some extent by allowing different types of operators to share slots, that is, Sharing resources between lightweight operators (requiring smaller slots) and heavy operators (requiring larger slots). However, this is only good if all operators have the same degree of concurrency and is not always optimal. In addition, some operators are better suited to run alone (for example, operators responsible for training in machine learning require dedicated GPU resources).

  • Kubernetes and Yarn tend to take a while to satisfy resource requests, especially when cluster loads are high. For some batch jobs, the time spent waiting for resources can reduce the efficiency of the job.

So when should you enable this feature? The default resource management mechanism applies to most stream and batch jobs. If your job is a long-running stream job or a fast batch job that requires significantly different resources at different stages of processing, and you have set different levels of concurrency for different operators, you can try fine-grained resource management to improve resource efficiency.

Alibaba’s internal Flink-based platform has been using this mechanism for some time, and in practice, cluster resource utilization has been significantly improved.

Refer to the documentation for more details on how to use fine-grained resource management.

Three, connector

Connector specifications

This version standardizes the connector metrics (see the FLIP-33). In the next few releases, the community will implement coverage of all connectors with standardized metrics simultaneously while gradually refurbishing all connectors around a new unified API. In Flink 1.14, we covered the Kafka connector and (partially) the file system connector.

Connectors are data gateways in Flink jobs. The connector metrics are one of the first things to check if the job is not performing as expected. We believe this will be a great improvement for the production operations of Flink applications.

Pulsar connector

This release adds the Apache Pulsar connector. The Pulsar connector supports reading data from a Pulsar topic in both stream and batch execution modes. Supported by Pulsar transactions (introduced since Pulsar 2.8.0), the Pulsar connector can support the semantics of precisely once data delivery, ensuring that the message is delivered to the consumer only once, even when the producer tries to retransmit the message.

The Pulsar Source connector supports four subscription types: exclusive, shared, disaster recovery, and key sharing to meet the message order and size requirements in different scenarios.

The connector currently supports the DataStream API. The SQL/Table API is expected to be available in a later release. Refer to the documentation on how to use the Pulsar connector.

Four, PyFlink

Link-based performance improvements

Just as the Java API links conversion functions and operators in tasks to avoid serialization overhead, PyFlink now links Python functions. With PyFlink, linking not only eliminates serialization overhead, but also reduces RPC communication between Java and Python processes. This dramatically improves PyFlink’s overall performance.

In previous versions, the SQL/Table API already links Python functions together. In Flink 1.14, this optimization further overrides the cPython function in the Python DataStream API.

Loopback debugging mode

Typically, Python functions are executed by a Python process independent of the Flink JVM. This architecture makes debugging Python code difficult.

PyFlink 1.14 introduced loopback mode, which is automatically enabled in local deployment mode. In this mode, user-defined Python functions are executed by the Python process running the client. This process is the entry point to start the PyFlink application and is responsible for executing all DataStream API and Table API code used to build the data flow DAG. Users can now easily debug Python functions by setting breakpoints in the IDE when running PyFlink jobs locally.

Other improvements

PyFlink has many other improvements, such as support for job execution in Yarn Application mode, support for Python archives in TGZ compression format, and more. Refer to the Python API documentation for more details.

Say goodbye to the old SQL engine and Mesos support

Maintaining an open source project also means sometimes parting with some beloved features.

When we added Blink SQL engine to Flink two years ago, it was clear that it would eventually replace the original SQL engine. Blink is faster and has more complete features. In the last year, Blink has become the default SQL engine. In Flink 1.14, we finally removed all the code from the old SQL engine. This allowed us to remove many of the outdated interfaces and avoid the user confusion of which interface to use when implementing custom connectors and functions. This will also help us iterate more quickly over the SQL engine in the future.

This release also removed the integration of Apache Mesos because we found that few users were still interested in the feature and not enough contributors were willing to help maintain this part of the system. Flink 1.14 will no longer be able to run on Mesos without relying on ancillary projects like Marathon, and Flink’s ResourceManager will no longer support dynamic requirings and releases of resources from Mesos based on the workload’s resource requirements.

Vi. Upgrade instructions

We’ve worked hard to make the upgrade as smooth as possible, but there are still some changes that require users to make adjustments to parts of the app when upgrading to Flink versions. Please refer to the release notice for possible adjustments and confirmations during the upgrade process.

The original connection: flink.apache.org/news/2021/0…

List of contributors

The Apache Flink community thanks every contributor who contributed to this release:

adavis9592, Ada Wong, aidenma, Aitozi, Ankush Khanna, anton, Anton Kalashnikov, Arvid Heise, Ashwin Kolhatkar, Authuir, bgeng777, Brian Zhou, camile.sing, caoyingjie, Cemre Mengu, chennuo, Chesnay Schepler, chuixue, CodeCooker17, comsir, Daisy T, Danny Cranmer, David Anderson, David Moravek, Dawid Wysakowicz, dbgp2021, Dian Fu, Dong Lin, Edmondsky, Elphas Toringepi, Emre Kartoglu, ericliuk, Eron Wright, est08zw, Etienne Chauchot, Fabian Paul, fangliang, fangyue1, fengli, Francesco Guardiani, FuyaoLi2017, fuyli, Gabor Somogyi, gaoyajun02, Gen Luo, gentlewangyu, GitHub, godfrey he, godfreyhe, gongzhongqiang, Guokuai Huang, GuoWei Ma, Gyula Fora, hackergin, hameizi, Hang Ruan, Han Wei, hapihu, Hehuiyuan, HSTDream, Huachao Mao, HuangXiao, Huangxingbo, Huxixiang, Ingo Burk, Jacklee, Jan Brusch, Jane, Jane Chan, Jark Wu, JasonLee, Jiajie Zhong, Jiangjie (Becket) Qin, Jianzhang Chen, Jiayi Liao, Jing, Jingsong Lee, JingsongLi, Jing Zhang, jinxing64, junfan.zhang, Jun Qin, Jun Zhang, kanata163, Kevin Bohinski, kevin.cyj, Kevin Fan, Kurt Young, kylewang, Lars Bachmann, lbb, LB Yu, LB-Yu, LeeJiangchuan, Leeviiii, leiyanfei, Leonard Xu, LightGHLi, Lijie Wang, liliwei, lincoln lee, Linyu, liuyanpunk, lixiaobao14, luoyuxia, Lyn Zhang, lys0716, MaChengLong, mans2singh, Marios Trivyzas, martijnvisser, Matthias Pohl, Mayi, mayue.fight, Michael Li, Michal Ciesielczyk, Mika, Mika Naylor, MikuSugar, movesan, Mulan, Nico Kruber, Nicolas Raga, Nicolaus Weidner, paul8263, Paul Lin, pierre xiong, Piotr Nowojski, Qingsheng Ren, Rainie Li, Robert Metzger, Roc Marshal, Roman, Roman Khachatryan, Rui Li, sammieliu, sasukerui, Senbin Lin, Senhong Liu, Serhat Soydan, Seth Wiesman, sharkdtu, Shengkai, Shen Zhu, shizhengchao, Shuo Cheng, shuo.cs, simenliuxing, sjwiesman, Srinivasulu Punuru, Stefan Gloutnikov, SteNicholas, Stephan Ewen, sujun, sv3ndk, Svend Vanderveken, syhily, Tartarus0zm, Terry Wang, Thesharing, Thomas Weise, tiegen, Till Rohrmann, Timo Walther, tison, Tony Wei, trushev, tsreaper, TsReaper, Tzu-Li (Gordon) Tai, wangfeifan, wangwei1025, wangxianghu, wangyang0918, weizheng92, Wenhao Ji, Wenlong Lyu, wenqiao, WilliamSong11, wuren, wysstartgo, Xintong Song, yanchenyun, yangminghua, yangqu, Yang Wang, Yangyang ZHANG, Yangze Guo, Yao Zhang, yfhanfei, yiksanchan, Yik San Chan, Yi Tang, yljee, Youngwoo Kim, Yuan Mei, Yubin Li, Yufan Sheng, yulei0824, Yun Gao, Yun Tang, yuxia Luo, Zakelly, zhang chaoming, zhangjunfan, zhangmang, zhangzhengqi3, zhao_wei_nan, zhaown, zhaoxing, ZhiJie Yang, Zhilong Hong, Zhiwen Sun, Zhu Zhu, Zoran, Zor X. LIU, Zoucao, Zsombor Chikan, Zyang, Mo Ci


For more technical problems related to Flink, you can scan the code to join the community nail nail exchange group for the first time to obtain the latest technical articles and community dynamics, please pay attention to the public number ~