This article is organized by Miao Wating, a community volunteer. The content is from The “Flink Operation and DataStream API Optimization” shared by Gao Yun (Yun Qian), a technology expert from Alibaba, at Flink Meetup in Beijing on May 22. The paper is mainly divided into four parts:

  1. Review the Flink stream batch design
  2. This section describes the runtime optimization points
  3. This section describes the optimization points for the DataStream API
  4. Summary and follow-up planning.

1. Flow batch integrated Flink

1.1 Architecture

First take a look at the overall logic of Flink stream batching. In its early days, Flink was a framework that supported both streaming and batching, but its streaming and batching implementations, both at the API layer and at the Shuffle, scheduling, and operator layers below, were separate sets. The two sets of implementations are completely separate and not particularly closely related.

Under the guidance of the goal of streaming and batching, Flink has now unified the abstraction of the underlying operators, scheduling, Shuffle, and supports the DataStream API and the Table API in a unified way. The DataStream API is a more physical layer interface, and the Table API is a Declearetive interface. The two sets of interfaces are uniform for streams and batches.

1.2 the advantages

  • Code reuse

    Based on the DataStream API and the Table API, users can write the same set of code to process both historical and real-time data, such as data reflux scenarios.

  • Easy to develop

    Unified Connector and operator implementation to reduce development and maintenance costs.

  • Easy to learn

    Reduce learning costs and avoid learning two sets of similar interfaces.

  • Easy to maintain

    Use the same system to support flow and batch jobs, reducing maintenance costs.

1.3 Data processing process

Here is a brief introduction to how Flink abstracts the flow batch as one. Flink splits the job into two kinds:

  • The first type of job is one that deals with an infinite stream of infinite data

    For this kind of job, Flink adopts a standard stream execution mode. It needs to consider the recording time and promote the entire system time through Watermark alignment to achieve some purposes of data aggregation and output. The intermediate State is maintained through State.

  • The second type of job is one that deals with finite data sets

    The data may be stored in a file or otherwise a limited set of data that has been preserved in advance. In this case, the finite data set can be considered a special case of the infinite data set, so it can naturally run ahead of the previous stream processing mode and can be directly supported without any code changes.

    However, it is possible to overlook the fact that the finite data set has limited data, and the need to deal with more granular semantics on the interface, such as time, Watermark, and so on, may introduce additional complexity. In addition, in terms of performance, because of the flow, you need to pull all the tasks up at the beginning, which may require more resources. If RocksDB Backend is used, which is equivalent to a large Hash table, you may have random I/O access problems when there are more keys.

    However, in batch execution mode, the whole data processing process can be implemented in a more IO friendly way by sorting. Therefore, on the premise of considering the limited data, batch processing mode provides us with a larger choice space in the implementation of scheduling, Shuffle and operator.

    Finally, for the limited data flow, we want the final processing results to be consistent regardless of which processing mode is adopted.

1.4 Recent Evolution

In recent releases, Flink has made a lot of efforts towards streaming in both the API and the implementation layer.

  • Before Flink 1.11:

    Flink unified the Table/SQL API and introduced a unified Blink Planner, where both the Blink Planner flows and batches are translated onto the DataStream operator. In addition, convection and batching introduce a unified Shuffle architecture.

  • In Flink 1.12:

    A new sort-merge shuffle mode is introduced for the batch shuffle. Compared with the previous Hash shuffle built-in in Flink, the performance of the shuffle is greatly improved. In terms of scheduling, Flink introduced a pipeline-region-based pipeline-integrated scheduler.

  • In Flink 1.13:

    The sort-merge Shuffle is improved and the performance of Pipeline Region Scheduler is optimized under large-scale operation. In addition, as mentioned earlier, for the two execution modes of a finite stream, we expect the results to be consistent. But now Flink has some problems at the end of the job execution, so it’s not completely consistent.

    So in 1.13, part of the work will be on how to make the results consistent with the expected results for finite data set jobs in streaming, especially in streaming mode.

  • Future Flink 1.14:

    It is necessary to continue to complete the limited job consistency assurance, batch stream switch Source, and gradually scrap the DataSet API, etc.

2. Runtime optimization

2.1 Optimization of large-scale job scheduling

2.1.1 Edge time complexity

When Flink submits a job, it generates a DAG graph of the job, which is composed of multiple vertices that correspond to our actual processing nodes, such as Map. Each processing node has a degree of concurrency. In the previous Flink implementation, when we submitted a job to JM, JM expanded the job to generate an Execution Graph.

As shown in the figure below, the job has two nodes with concurrent degrees of 2 and 3. In the actual data structure maintained in JM, two tasks and three tasks are maintained respectively, which are composed of six execution edges. Flink maintains the topology information of the entire job based on this data structure. Based on this topology information, Flink can maintain the state of each task individually to identify tasks that need to be pulled up when they are suspended.

In the case of all-to-all communication, that is, every two upstream and downstream tasks have edges, and upstream concurrency * downstream concurrency, the data structure of O(N^2) will appear. The memory footprint in this case is staggering, with a 10K by 10K edge and JM’s memory footprint reaching 4.18G. In addition, most of the computational complexity of jobs is related to the number of edges. At this time, the spatial complexity is O(N^2) or O(N^3). If it is 10k * 10K edges, the initial scheduling time of jobs will reach 62s.

As you can see, in addition to the initial scheduling, for batch jobs, it is possible to finish upstream and continue downstream, with intermediate scheduling complexity of O(N^2) or O(N^3), which can cause significant performance overhead. In addition, GC does not perform particularly well with a high memory footprint.

2.1.2 Symmetry of the Execution Graph

In view of some problems in memory and performance of Flink in large-scale operation, after some in-depth analysis, it can be seen that there is a certain symmetry between upstream and downstream nodes in the above example.

The types of “edges” in Flink can be divided into two types:

  • One is the Pointwise type, where upstream and downstream are one-to-one, or one upstream corresponds to several downstream, not all connected, in which case the number of edges is basically linear O(N), on the same order of magnitude as the number of operators.
  • The other type is all-to-all, in which each upstream task is connected to each downstream task. In this case, it can be seen that the data set generated by each upstream task is consumed by All downstream tasks, which is actually a symmetric relationship. As long as you remember that the upstream data set is consumed by all downstream tasks, you don’t need to save the middle edge separately.

Therefore, In 1.13, Flink introduced the concepts of ResultPartitionGroup and VertexGroup to the upstream data set and downstream nodes respectively. Especially for all-to-all edges, because the upstream and downstream are symmetric, All upstream data sets can be put into a Group, and All downstream nodes can also be put into a Group. In actual maintenance, there is no need to save the relationship of the middle edge. All you need to know is which upstream data set is consumed by which downstream Group, or which downstream vertex is consumed by which upstream Group data set. In this way, the memory usage is reduced.

In addition, when actually doing some scheduling related calculations, such as batch processing, if all edges are blocking edges, each node belongs to a separate region. Before calculating the upstream and downstream relationships between regions, each upstream vertex needs to traverse all downstream vertices, so it is an O(N^2) operation. With ConsumerGroup, it becomes an O(N) linear operation.

2.1.3 Optimization result

After the optimization of the above data structure, in the case of 10K * 10K edge, the JM memory can be reduced from 4.18G to 12.08m, and the initial scheduling time can be reduced from 62s to 12s. This optimization is actually quite significant, and users can reap the benefits simply by upgrading to Flink 1.13 without any additional configuration.

2.2 Sort-Merge Shuffle

Another optimization is the shuffle optimization for batch jobs. In general, the batch job will write the result to an intermediate file after the upstream run, and then pull the data from the intermediate file for processing downstream.

The advantage of this approach is that it saves resources by not having to go upstream and downstream at the same time, and in the event of a failure, it does not have to be executed from scratch. This is a common way to execute batch processing.

2.2.1 Hash Shuffle

So, during shuffle, how does the intermediate result get saved to the intermediate file and then pulled downstream?

Previously, Flink introduced Hash shuffle. Taking the all-to-all edge as an example, the data set generated by the upstream task is written to a separate file for each downstream task, so that the system may produce a large number of small files. Each file, whether using file IO or mmap, uses at least one buffer, wasting memory. Downstream tasks that randomly read upstream data files also generate a large number of random I/OS.

Therefore, the previous Flink Hash Shuffle application in batch processing can only work in production on a small scale or when using SSDS. In the larger scale or SATA disk is a big problem.

2.2.2 Sort Shuffle

So, in Both Flink 1.12 and Flink 1.13, a new shuffle based on Sort Merge was introduced. This Sort does not refer to the data Sort, but to the downstream write task target Sort.

General principle is that the upstream in the output data, USES a fixed size buffers, avoid the buffer size increases with the increase of the scale, all data written to the buffer, when the buffer is full, can make a order and wrote in a separate file, the back of the data is based on the cache area continue to write, written a will spell behind the original file. Finally, a single upstream task produces an intermediate file consisting of many segments, each of which has an ordered structure.

Unlike other batch processing frameworks, this is not based on normal external ordering. In general, merge the segments separately to form an ordered file. This allows for better I/O continuity for downstream reads and prevents each segment from being too small for each task to read. However, the merge itself consumes a lot of I/O resources, and it is possible that the time cost of the merge far exceeds the benefit of the downstream sequential reads.

So, here’s another way: At the time of downstream to request data, such as image below three downstream to upstream in the middle of the file to read, there will be a scheduler to downstream request to read the file location to do a sorting, through the way of IO scheduling in the upper, to realize the continuity of the entire file IO read, prevent to produce a large number of random IO on SATA disk.

On SATA disks, the I/O performance of the Sort shuffle is 2 to 8 times higher than that of the Hash shuffle. By using Sort shuffle, Flink batch processing is basically available for production. The I/o performance of the disk can reach more than 100 M on the SATA disk, and the highest read/write speed of the SATA disk can reach 200M.

To maintain compatibility, Sort Shuffle is not enabled by default, and users can control how much downstream concurrency is achieved to enable Sort Merge shuffle. And you can further improve batch performance by enabling compression. Sort Merge shuffle does not take up any additional memory. The current upstream read/write buffer is extracted from the Framework.

3. Optimize the DataStream API

3.1 2PC and End-to-end Consistency

To ensure end-to-end consistency, Flink flow jobs are implemented using a two-phase commit mechanism that combines Flink checkpoint, failover, and features of external systems.

The logic is that if I want to achieve end-to-end consistency, such as reading and then writing to Kafka, I will write the data to a Kafka transaction first, and then preCommit it at checkpoint, so that the data will not be lost.

If the checkpoint is successful, a formal commit is performed. This ensures that the transactions of the external system are consistent with the failover of Flink. For example, if a failover occurs on Flink and it needs to be rolled back to the previous checkpoint, the transactions corresponding to this part of the external system will also be abort. If checkpoint succeeds, commit the external transaction succeeds.

Flink end-to-end consistency depends on the checkpoint mechanism. However, there are some problems when dealing with finite streams:

  • For jobs with limited streams, Flink does not support checkpoint after the task has finished. For example, if the task is mixed with a batch of streams, some of the tasks will end, and Flink will not be able to checkpoint again, so the data will not be submitted.

  • At the end of a finite stream of data, because checkpoint execution is scheduled, it is not guaranteed that the last checkpoint can be executed after all data is processed. As a result, the last part of data may fail to be submitted.

The above results in inconsistent results between finite flow job flow and batch execution mode in flow mode.

3.2 Supporting Checkpoint after Some Tasks Are Completed (in progress)

Starting from Flink 1.13, checkpoint can be performed after some tasks have been completed. Checkpoint maintains a status list of all tasks for each operator.

After some of the tasks have finished, see the dashed part of the figure below. Flink will divide the finished tasks into two categories:

  • If all subtasks of an operator are finished, a finished flag is stored for the operator.

  • If an operator has only partial task completion, only the unfinished task state is stored.

    Based on this checkpoint, all operators are still pulled up after failover. If it is recognized that the last execution of the operator has ended, that is, finsihed = true, the execution of the operator is skipped. In particular, with the Source operator, if it is finished, the data will not be sent again. In this way, you can ensure the consistency of the entire state. Even if some tasks end, you can still go to checkpoint.

Flink also rearranged the closing semantics. Now that the Flink job is over, there are several possibilities:

  • Job end: Data is limited and a limited stream job ends normally;
  • Stop-with-savepoint, select a savepoint;
  • Stop-with-savepoint –drain, pick a savepoint to finish, and push watermark to positive infinity.

There were two different implementation logic, and both had problems with the last part of the data not being delivered.

  • For both stop-with-savePoint –drain semantics, it is expected that the operation will not restart, so the operator will be alerted to checkpoint through a uniform set of checkpoint methods.

  • With stop-with-savePoint semantics, the job is expected to continue savePoint restart, so the endOfInput() is not applied to the operator. Then a checkpoint is performed to ensure that the last part of the data can be submitted to the external system if the job is destined to end and will not be restarted.

4. To summarize

One of Flink’s overall goals is to create a unified platform for efficient processing of finite and infinite data sets. There is basically a preliminary prototype, both in terms of API and runtime. Here’s an example to illustrate the benefits of streaming in one batch.

For the backflow operation of users, it usually handles the infinite flow. If you want to change the logic one day, stop the flow with stop-with-savepoint. However, the data of the previous two months should be recovered to ensure the consistency of the results. At this point, you can start a batch job: the job does not change, and runs on the input data cached in advance, using batch mode to correct the previous two months ‘data as quickly as possible. In addition, based on the new logic, a new flow job can be restarted using the previously saved SavePoint.

As you can see, in the whole process above, if the flow and batch were separate, there was a need for separate development jobs for data correction. But in the case of stream batching, data correction can be done naturally based on the stream job, requiring no additional development by the user.

In future releases of Flink, there will be further consideration for more streaming and batching scenarios, such as where the user does a batch of processing, initializes the state, and then switches to an infinite stream. Of course, in the flow and batch of separate functions, will do further optimization and improvement, making Flink in the flow of the batch is a competitive computing framework.