This article was compiled by community volunteer Chen Zhengyu and is based on a preview of New Flink 1.14 features shared by Alibaba tech expert Song Xintong on Flink Meetup on August 7. The main contents are:

  1. Introduction to the
  2. Flow of an organic whole
  3. Checkpoint mechanism
  4. Performance and efficiency
  5. Table / SQL / Python API
  6. conclusion

This post was shared on August 7th, and the latest developments in version 1.14 are explained with notes at the end.

A list,

1.14 originally planned 35 important new features and optimizations, so far 26 of them have been completed; 5 tasks are not sure whether they can be completed on time; The other four features will be completed in a later version due to time or design reasons. [1]

Compared to previous versions, 1.14 does not include many optimizations and new feature points. In fact, if you look at the pace of releases, usually after one or two major releases a slightly less changed version will be released, mainly to stabilize some features.

Version 1.14 is such a positioning, we call it the Quality improvement and Maintenance release. This version is expected to stop the development of new features on August 16, and may be officially available in September. If you are interested, you can follow the following link to track the progress of feature release.

  • Wiki:cwiki.apache.org/confluence/…
  • Jira:issues.apache.org/jira/projec…

Two, flow batch one

Streaming batch integration has been on the radar since Flink 1.9, as an important part of the community RoadMap for real-time big data. On the other hand, the traditional offline computing requirements are not completely replaced by real-time tasks, but are here to stay.

Under the condition of simultaneous real-time and offline requirements, there are some pain points in the previous flow batch independent technology solutions, such as:

  • Need to maintain two sets of systems, the corresponding need two groups of developers, human input cost is very high;

  • In addition, two sets of data links deal with similar content, resulting in maintenance risks and redundancy;

  • The most important point is that if the stream batch uses a different set of data processing system, the engine itself may have the problem of inconsistent data caliber, resulting in some errors in business data. This error will have a great impact on big data analysis.

In this context, Flink community has identified the technical route of real-time offline integration as an important technical trend and direction.

Flink has done a lot of work in streaming batch integration over the past few releases. It can be considered that Flink achieves the real flow and batch operation with the same set of mechanisms at the engine level, API level and operator execution level. However, there are two different execution modes of the task:

  • For infinite data flow, the unified implementation mode of stream is adopted. The execution mode of flow refers to that all compute nodes are connected through Pipeline mode. Pipeline means that upstream and downstream computing tasks run simultaneously. As upstream continuously produces data, downstream continuously consumes data. This full Pipeline execution can:

    • EventTime indicates when the data was generated;

    • Watermark tells you at what point in time the data has arrived;

    • Maintain computing intermediate state through state;

    • Checkpoint is used for fault tolerance.

    Here are the different execution modes:

  • There are two modes of execution for a finite data set. We can treat it as a finite data stream, or we can treat it as a batch mode. The batch execution mode does have eventTime, but only supports plus infinity for Watermark. After sorting data and state, it has more options for scheduling and shuffling tasks.

    The execution modes of a stream batch are different. The most important one is that the batch execution mode has an intermediate process of falling disk. The downstream task is triggered only when the current task is completed.

    These two also have their own execution advantages:

    • For the execution mode of stream, it does not have the pressure of falling disk. Meanwhile, fault tolerance is based on data segmentation, and data is constantly Checkpoint to ensure breakpoint recovery.

    • However, in batch processing, the disk will be shuffled, so there will be pressure on the disk. However, since the data is sorted, the efficiency of subsequent calculations may be improved for batch. At the same time, tasks are executed in segments without simultaneous execution. In terms of fault tolerance calculation, fault tolerance is carried out according to stage.

    These two kinds have their own advantages and disadvantages, and can be selected according to the specific scene of the operation.

The optimization point of Flink 1.14 is mainly about how to deal with limited data sets in the execution mode of stream. The big difference between dealing with infinite data sets and dealing with finite data sets is the idea that the task might end. This situation brings up some new problems, as shown below:

  • Checkpoint mechanism in stream execution mode

    • For an infinite stream, its Checkpoint is triggered by all the source nodes. The source node sends a Checkpoint Barrier. When the Checkpoint Barrier flows through the job, All state of the current job is also stored.

    • In a finite flow Checkpoint mechanism, it is possible for tasks to end prematurely. An upstream Task may finish processing the Task first and exit early, while a downstream Task is still executing. Under the same stage and different concurrency, some tasks may be completed earlier due to inconsistent data amount. In this case, how do I Checkpoint subsequent jobs?

      In 1.14, JobManager dynamically determines where the Checkpoint Barrier starts based on the execution of the current task. At the same time, after some tasks are completed, the subsequent Checkpoint only saves the stage corresponding to the Task that is still running. In this way, the Task can be Checkpoint again to provide better fault tolerance in limited flow execution.

  • Two-phase commit after the Task ends

For some sinks, such as Kafka Sink in the following figure, tasks need to rely on Checkpoint mechanism for two-stage submission, so as to ensure the exact-once consistency of data.

In the Checkpoint process, each operator performs only operations that are ready to commit. For example, data is committed to an external temporary storage directory. After all tasks are Checkpoint completed, a signal is received and a formal commit is performed. All distributed temporary files are committed to the external system in a one-time transaction.

In the case of limited flow, Checkpoint is not guaranteed at the end of the job. How to submit the last part of data?

In 1.14, this problem was addressed. After a Task has finished processing all data, it must wait until the Checkpoint is complete before it can officially exit. This is an improvement to the end of finite flow tasks.

Checkpoint

1. Pain points of existing Checkpoint mechanisms

At present, when a Flink Checkpoint is triggered, the barrier flows between operators. The barrier is sent downstream along with the operator. When the operator encounters a barrier, the operator snapshots the barrier and sends it downstream again. In the case of multiple barriers, we will align the barrier and temporarily block the data that reaches the barrier first. After both barriers arrive, we will take snapshots and finally send further barriers.

Existing Checkpoint mechanisms have the following problems:

  • ** Barrier cannot Checkpoint when backpressure is applied: ** Barrier cannot flow downstream with data when backpressure is applied, causing barrier to Checkpoint when backpressure is applied. But in fact, when backpressure occurs, we need to Checkpoint data more, because at this time, performance is bottleneck, and problems are more likely to occur.
  • ** Blocking alignment has an impact on performance;
  • ** Recovery performance is limited by Checkpoint interval: ** The delay is often affected by the Checkpoint interval. The larger the interval, the more data needs to be replayed and the greater the impact of the interruption. However, the Checkpoint interval is currently limited by the duration of the persistence operation, so it cannot be done quickly.

2. Unaligned Checkpoint

To address these pain points, Flink has been continuously optimized in recent releases, and Unaligned Checkpoint is one of them. The barrier operator starts to Checkpoint when it reaches the top of the input buffer. It immediately passes the barrier to the top of the operator’s OutPut Buffer, i.e. it is immediately read by the downstream operator. In this way, the barrier is not blocked and cannot be Checkpoint when the backpressure is reversed.

When we send down the barrier, we need to make a short pause. During the pause, we will mark the State of the operator and the data in the input/output buffer, so as to facilitate subsequent uploading at any time. In the case of multi-path, all data will be marked until the arrival of another barrier.

In this way, no barrier alignment is required during Checkpoint operation. The only pause required is to check all buffers and states. This method can solve the problem that Checkpoint and Barrier alignment cannot be made during backpressure, which affects performance processing.

3. Generalized Incremental Checkpoint [2]

Generalized Incremental Checkpoint is mainly used to reduce Checkpoint interval, as shown in Figure 1 on the left. In Incremental Checkpoint, the operator is first written to the state Changelog. Write the actual change data to the StateTable after the write is complete. The state Changelog continuously implements persistent storage externally. During this process, we do not need to wait for the StateTable to perform a persistent operation. We only need to ensure that the corresponding Checkpoint of Changelog can be persisted, and then we can start the next Checkpoint. StateTable is an independent process of external maintenance in a periodic manner.

By splitting these two processes, we have changed from Per Checkpoint persistence ** to incremental persistence + background periodic full persistence to achieve the same fault tolerance. During this process, the amount of data to be persisted at each Checkpoint is reduced, which greatly reduces the Checkpoint interval.

Incremental Checkpoint is also supported in RocksDB. But there are two problems:

  • The first problem is that Incremental Checkpoint of RocksDB relies on some of its own implementations. There will be some data compression, and the time consumed by compression and the compression effect are uncertain, which is related to data.
  • The second problem is that it can only be used for specific StateBackend. The Generalized Checkpoint we are doing now actually guarantees that it is independent of StateBackend. The run-time mechanism ensures a more stable and smaller Checkpoint interval.

Unaligned Checkpoint is currently available in Flink 1.13. In version 1.14, it is mainly for bug fixes and supplements. The community is still making the final push, hopefully we will meet in 1.14. [2]

Four, performance and efficiency

1. Optimization of large-scale job scheduling

  • ** Building Pipeline regions improves performance: ** All subgraphs connected by Pipline edges. In Flink task scheduling, it is necessary to identify Pipeline Region to ensure that tasks connected by the same Pipline side can be scheduled at the same time. Otherwise, it is possible that upstream tasks start scheduling, but downstream tasks do not run. As a result, upstream data cannot be consumed by downstream nodes, which may result in deadlock
  • ** Task Deployment phase: ** What upstream data is read from for each task, and this information is generated in the Result Partition Deployment Descriptor.

Both of these build processes had O (n^2) time complexity in previous versions, and the main problem was traversing each upstream node for each downstream node. For example, to traverse the relationship between each upstream and whether it is a Pipeline side connection, or to traverse its each upstream to generate corresponding Result Partition information.

At present, by introducing the concept of group, it is assumed that the connection mode of the two upstream and downstream tasks is all-to-all, which is equivalent to combining all Pipeline Region information or Result Partition information in the form of group. In this way, we can optimize the complexity of an O (n^2) to O (n) by knowing which upstream group the downstream corresponds to. We did some testing with the WordCount task to compare performance before and after optimization.

It can be seen from the table that the construction speed is greatly improved, and the performance of Pipeline Region construction is improved from second level to millisecond level. Task deployment We deploy from the first task to the state when all tasks start running. Here we only count flows, because the batch can’t end scheduling until upstream ends. In terms of overall time, the overall task initialization, scheduling, and deployment phases can reduce time consumption in minutes.

2. Fine-grained resource management

Fine-grained resource management has been in the works for many releases in the past, but in Flink1.14 this part of the API was finally made available to users on DataSteam. You can customize SlotSharingGroup allocation in DataStream. The following figure shows how to define Slot resource allocation, which supports the DataStream API. Customize SSG division and resource configuration TaskManager Dynamic resource deduction.

Fine-grained configuration is available for each Slot, and the Runtime automatically performs dynamic resource cutting based on user resource configuration.

The advantage of this method is that there is no fixed Slot for resources as before, but dynamic deduction for resources. In this way, we hope to achieve more refined resource management and resource utilization.

Table/SQL/Python API

1. Table API / SQL

Window table-valued Function supports more operators and Window types

As can be seen from the table, the existing three window types are strengthened, Session window types are added, and the Aggregate operation is supported.

1.1 Support declarative registration Source/Sink

  • The Table API supports declaratively registering Source/Sink functionality to align SQL DDL;
  • Support flip-27 new Source interface;
  • New Source replaces the old Connect () interface.

1.2 New code generator

To solve the problem of generating code that exceeds Java’s maximum code limit, the new code generator will disassemble the code and completely solve the problem of excessively long code.

1.3 Remove Flink Planner

In the new release, Blink Planner will be the only implementation of Flink Planner.

2. Python API

In previous versions, if there were two UDFs executed successively, it would be executed on the left side of the figure below. The JVM has a Java Operator that sends data to a Python UDF for execution, sends it back to Java, then sends it downstream to an Operator, and then performs Python’s cross-process transfer. This results in many redundant data transfers.

In version 1.14, improvements like the one on the right allow them to be linked together, requiring only one round-trip Java and Python data communication, and achieving a significant performance improvement by reducing the number of data transfers.

3. LoopBack mode is supported

Native execution in the past actually ran the client program in a Python process, submitting the Java process to start a mini-cluster to execute the Java portion of the code. The Java part of the code, like the production part of the code, starts a new Python process to execute the corresponding Python UDF. As you can see from the diagram, the new process does not need to exist in local debugging.

Lookback allows Java opt to run udFs directly in the same process that Python clients run before:

  • First, it avoids the overhead of starting additional processes.
  • The most important thing is that in local debugging, we can make better use of some tools to debug in the same process, which is an improvement on the developer experience.

Six, summarized

This article introduces the main new features of Flink1.14.

  • Firstly, the current community work on batch integration is introduced, which can better compatible with batch jobs by introducing different execution modes of batch and optimization and improvement triggered by JM node tasks.

  • Then, we analyze the pain points of the existing Checkpoint mechanism, how to improve it in the new version, and how to optimize the performance in large-scale job scheduling optimization and fine-grained resource management.

  • Finally, the performance optimization of TableSQL API and Pyhton is introduced.

Welcome to stay tuned for some of the latest developments in the Release and some of our other technical sharing and topics in the subsequent Release process.

annotation

[1] As of August 31, 33 are confirmed to enter the new version, and all have been completed.

[2] Generalized Checkpoint was eventually not completed in 1.14.