Summary: In 1.13, Flink optimized the performance of network shuffles in batch execution mode for large-scale job scheduling, as well as improved exit semantics for finite-stream jobs in the DataStream API for the streaming-batch goal.

This article is organized by community volunteer Miao Wenting. The content is from “Flink Runtime and Datastream API Optimization for Streaming and Batching”, which was shared by Alibaba technical expert Gao Yun (Yun Qian) at Flink Meetup in Beijing Station on May 22. This paper is mainly divided into four parts:

  1. Review the Flink stream batch integrated design
  2. Describes optimization points for the runtime
  3. Describes the optimization points for the DataStream API
  4. Summary and some follow-up planning.

Making the address https://github.com/apache/flink welcome to Flink thumb up to star ~

1. Flink with batch integration

1.1 Introduction to architecture

Let’s start with the overall logic of Flink streams and batches. In its early days, Flink was a framework that supported both streaming and batching, but its stream and batch implementations were separate, both in the API layer and in the Shuffle, scheduling, and operator layers underneath. The two sets of implementations are completely separate and not particularly closely related.

With the goal of stream-batch integration, Flink has now abstracted the underlying operators, scheduling, and shuffles in a unified way to support both the DataStream API and the Table API. The DataStream API is a more physical interface and the Table API is a Declearetive interface, both of which are consistent for both streams and batches.

1.2 the advantages

  • Code reuse

    Based on the DataStream API and the Table API, users can write the same set of code to handle both historical and real-time data, such as data reflow scenarios.

  • Easy to develop

    Unified Connector and Operator implementation reduces development and maintenance costs.

  • Easy to learn

    Reduce learning costs and avoid learning two sets of similar interfaces.

  • Easy to maintain

    Use the same system to support both stream and batch jobs, reducing maintenance costs.

1.3 Data processing process

Here’s how Flink abstracts batch into batch. Flink split the job into two types:

  • The first type of job is an infinite stream job that deals with infinite data

    This type of job is known as a stream job. For this type of job, Flink uses a standard stream execution mode, which takes into account the recording time, pushes the whole system time through the Watermark alignment to achieve some data aggregation and output purposes, and maintains the intermediate State through the State.

  • The second type of jobs is those that work with limited data sets

    The data may be a limited set of data kept in a file or otherwise preserved in advance. The finite data set can be thought of as a special case of the infinite data set, so it can naturally run above the previous stream processing mode, without code modification, and can be supported directly.

    However, it is possible to ignore the limited nature of the limited data set and introduce additional complexity by dealing with finer grained semantics such as time, Watermark, etc., on the interface. In addition, in terms of performance, since it is processed in a streaming way, all tasks need to be pulled up at the beginning, which may require more resources. If RocksDB backend is used, it is equivalent to a large Hash table. In the case of more keys, there may be random IO access problems.

    However, in batch execution mode, the whole data processing process can be implemented in a more IO friendly way by sorting. Therefore, under the premise of considering the limited data, batch processing mode provides us with a larger choice space in the implementation of scheduling, Shuffle and operator.

    Finally, for limited data streams, we want the final result to be consistent regardless of the processing mode used.

1.4 Recent Evolution

In recent releases of Flink, a lot of effort has been made towards stream-batch integration both in the API and in the implementation layer.

  • Before Flink 1.11:

    Flink unifies the Table/SQL API and introduces a unified Blink Planner where both stream and batch are translated onto the DataStream operator. In addition, a unified shuffle architecture is introduced for convection and batch.

  • In Flink 1.12:

    A new Sort-Merge based shuffle mode has been introduced for batch shuffles, providing a significant performance improvement over the previous Flink built-in Hash shuffles. In terms of scheduling, Flink introduces a batch-in-stream scheduler based on Pipeline Region.

  • In Flink 1.13:

    The Sort-Merge Shuffle has been improved and the performance of the Pipeline Region Scheduler has been optimized for large-scale operations. In addition, as mentioned earlier, we expect the execution results to be consistent for the two execution modes of finite flow. But now Flink still has some problems at the end of job execution, which leads to it’s not completely consistent.

    So in 1.13, another part of the work is on the limited data set job, how to make its results consistent with the expected results in the stream batch, especially in the stream mode.

  • Future Flink 1.14:

    It is necessary to continue to complete the work of limited job consistency guarantee, batch stream switching Source, and gradually discarding DataSet API.

2. Runtime optimization

2.1 Optimization of large-scale job scheduling

2.1.1 Time complexity of edges

When Flink submits a job, it generates a DAG graph of the job, consisting of multiple vertices that correspond to our actual processing nodes, such as Map. Each processing node has a degree of concurrency. In Flink’s previous implementation, when we submitted a job to JM, JM expanded the job and generated an Execution Graph.

As shown in the figure below, the job has two nodes with concurrency of 2 and 3. In the actual data structure maintained in JM, two tasks and three tasks are maintained respectively and consist of six execution edges. Based on this data structure, Flink maintains the topology information of the entire job. Based on this topology information, Flink can separately maintain the state of each task and identify the tasks that need to be pulled when the task is suspended.

In this kind of all-to-all communication, where there are edges between every two upstream and downstream tasks, upstream concurrency occursDownstream concurrency, a data structure of O(N^2) will appear. In this case, the memory footprint is staggering if it’s 10KWith 10K edges, the memory footprint of JM will reach 4.18G. In addition, a lot of computational complexity of the job is related to the number of edges. In this case, the space complexity is O(N^2) or O(N^3). If the edge is 10k * 10k, the initial scheduling time of the job will reach 62s.

As you can see, in addition to the initial scheduling, it is possible for batch jobs to execute upstream and then downstream, with intermediate scheduling complexity of either O(N^2) or O(N^3), which leads to a significant performance overhead. In addition, GC performance may not be particularly good with a large memory footprint.

2.1.2 Symmetry of the Execution Graph

After some in-depth analysis of the memory and performance problems of Flink in large-scale operations, it can be seen that there is a certain symmetry between the upstream and downstream nodes in the above example.

There are two types of “edges” in Flink:

  • One is the Pointise type, where the upstream and downstream are one-to-one, or one of the upstream corresponds to several of the downstream, not all of them connected, in which case the number of edges is basically linear O(N), on the same order of magnitude as the number of operators.
  • In this case, it can be seen that the data set generated by each upstream task will be consumed by All downstream tasks. In fact, it is a symmetric relationship. Just remember that the upstream data set is consumed by all downstream tasks, so there is no need to store the middle edge separately.

Therefore, in 1.13, Flink introduced the concepts of ResultPartitionGroup and VertexGroup respectively for the upstream data set and downstream nodes. Especially for all-to-all edges, since upstream and downstream are symmetrical, All data sets generated by upstream can be put into a Group, and All nodes generated by downstream can also be put into a Group. In actual maintenance, there is no need to save the relationship between the middle edges. You only need to know which upstream data set is consumed by which downstream Group, or which downstream vertex is consumed by which upstream data set. In this way, the memory footprint is reduced.

In addition, when we do some computations related to scheduling, for example, in batch processing, if all edges are blocking edge, each node belongs to a separate region. Before calculating the upstream and downstream relationship between regions, for each vertex in the upstream, it is necessary to traverse all vertices in the downstream, so it is an O(N^2) operation. When the ConsumerGroup is introduced, it will become an O(N) linear operation.

2.1.3 Optimization results

After the optimization of the above data structure, the memory occupation of JM can be reduced from 4.18G to 12.08M in the case of 10K * 10K edges, and the long initial scheduling time can be reduced from 62s to 12s. This optimization is so significant that users can benefit from upgrading to Flink 1.13 without any additional configuration.

2.2 Sort-Merge Shuffle

Another optimization is the data shuffle for batch jobs. In general, the batch work is finished in the upstream, will first write the results to an intermediate file, and then the downstream from the intermediate file to pull data for processing.

The advantage of this approach is that it saves resources, does not require both upstream and downstream operations, and in the case of failure, does not require implementation from scratch. This is a common way of executing batch processing.

2.2.1 Hash Shuffle

So how do intermediate results in a shuffle be saved to an intermediate file and pulled downstream?

Previously, Flink introduced a Hash shuffle. Taking the all-to-all edge as an example, the data set generated by the upstream task will be written to a separate file for each downstream task. In this way, the system may produce a large number of small files. And whether using file IO or mmap, each file is written using at least one buffer, resulting in a waste of memory. Upstream data files randomly read by a downstream task also generate a large number of random IO.

As a result, Flink’s previous Hash Shuffle application in batch processing can only be compared to work in production on a small scale or when SSD is used. There are major problems on large scale or SATA disks.

2.2.2 Sort Shuffle

So, in Flink 1.12 and Flink 1.13, after two versions, a new Sort merge-based shuffle was introduced. This Sort does not Sort the data, but rather the task target written downstream.

General principle is that the upstream in the output data, USES a fixed size buffers, avoid the buffer size increases with the increase of the scale, all data written to the buffer, when the buffer is full, can make a order and wrote in a separate file, the back of the data is based on the cache area continue to write, written a will spell behind the original file. Finally, a single upstream task produces an intermediate file, consisting of many segments, each of which has an ordered structure.

Unlike other batch frameworks, this is not based on a normal external sort. In general, external sorting means that these segments are merged again to form an overall ordered file. In this way, there will be better IO continuity when the downstream reads, and each task will not read a small data segment. However, the merge itself is also very IO intensive, and it is possible that the time overhead of the merge will far outweigh the benefit of the downstream sequential reads.

So, here’s another approach: At the time of downstream to request data, such as image below three downstream to upstream in the middle of the file to read, there will be a scheduler to downstream request to read the file location to do a sorting, through the way of IO scheduling in the upper, to realize the continuity of the entire file IO read, prevent to produce a large number of random IO on SATA disk.

On SATA disks, Sort Shuffle can improve IO performance by a factor of 2-8 relative to Hash Shuffle. With Sort Shuffle, Flink batch processing is almost ready for production. IO performance on SATA disks can reach over 100 MB, and SATA disks can reach a maximum read/write speed of 200 MB.

For compatibility, Sort Shuffle is not enabled by default and users can control how much downstream concurrency is achieved to enable Sort Merge Shuffle. And you can further improve the performance of batch processing by enabling compression. Sort Merge Shuffle does not take up any additional memory. It now takes up the cache of upstream reads and writes, which is drawn from the framework.off-heap.

3. Datastream API optimization

3.1 2PC & end-to-end consistency

To ensure end-to-end consistency, for Flink stream jobs, this is achieved through a two-phase commit mechanism that combines Flink’s checkpoint and failover mechanisms with some features of external systems.

The logic is that if I want to do end-to-end consistency, such as reading Kafka and then writing to Kafka, I will preCommit the data to a Kafka transaction as a checkpoint so that the data will not be lost.

If the checkpoint succeeds, a formal commit is made. This ensures that the external system transactions are consistent with the internal failover of Flink. For example, if a Flink failover occurs and needs to roll back to the previous checkpoint, the external system transactions corresponding to this part will also be aborted. If the checkpoint succeeds, so does the commit of the external transaction.

Flink’s end-to-end consistency relies on the Checkpoint mechanism. However, there are some problems with finite streams:

  • For tasks with limited flow, Flink does not support checkpoints after completion of tasks. For tasks with mixed streams, some of them will end. After that, Flink will not be able to check points again and data will not be submitted.
  • At the end of a finite stream of data, because checkpoints are timed, there is no guarantee that the last checkpoint will be executed after all the data has been processed, which may cause the last part of the data to be uncommitted.

This can lead to inconsistent results in stream/batch execution of finite-stream jobs in stream mode.

3.2 Support Checkpoint after the end of some tasks (in progress)

Starting with Flink 1.13, you can also checkpoint tasks after they have finished. A checkpoint is actually a list of the states of all the tasks that maintain each operator.

After some tasks have finished, the dotted part of the figure below. Flink divides the terminated tasks into two types:

  • If all subTasks of an operator have ended, a Finished flag is stored for the operator.
  • If an operator terminates only part of its tasks, it stores only the unfinished task state.

    Based on this checkpoint, all operators will still be pulled after the failover. If it is recognized that the last execution of the operator has ended, i.e., finsihed = true, the operator’s execution will be skipped. Especially for the Source operator, if it has already ended, the send data will not be re-executed later. In this way, you can ensure the consistency of the whole state. Even if some tasks end, you still go to checkpoint.

Flink also rearranges the closing semantics. Now that the Flink job is done, there are several possibilities:

  • Job end: The data is limited, and the finite-flow job ends normally.
  • Stop-with-savepoint = “stop-with-savepoint”;
  • Stop-with-savepoint –drain, which ends with a savepoint and pushes the watermark to infinity.

Before, there were two different implementations of logic, and both had problems with the last part of the data being uncommitted.

  • For both stop-with-savepoint –drain semantics, where the job is not expected to be restarted, endOfInput() is called to the operator, telling the operator to checkpoint in a consistent way.
  • For stop-with-savePoint semantics, the job is expected to continue the savePoint restart, so endOfInput() is not called to the operator. A second checkpoint is followed, so that for jobs that must end and not be restarted, the last part of the data must be committed to the external system.

4. To summarize

Part of the overall goal of Flink is to create a unified platform for efficient processing of both finite and infinite data sets. There is basically a prototype in place, both in terms of the API and in terms of Runtime. Here’s an example to illustrate the benefits of integrating streams and batches.

For the backflow operations of users, they usually deal with unlimited streams. If they want to change the logic one day, they can stop the flow by stop-with-savepoint. However, the change logic also needs to recover the data within the previous two months to ensure the consistency of the results. At this point, you can start a batch of jobs: jobs are not modified, run ahead of the cached input data, using the batch mode can be corrected as soon as possible from the previous two months of data. In addition, based on the new logic, a new stream job can be restarted using the savepoint saved earlier.

It can be seen that in the whole process above, if the flow batch is separated before, it is necessary to develop a separate operation for data correction. However, in the case of stream batch integration, data correction can be carried out naturally based on the stream job, without additional development by the user.

In future versions of Flink, more batch-combined scenarios will be considered, such as one where the user does a batch process, initializes the state, and then cuts to an infinite stream. Further improvements will also be made to the streaming and batching functions alone, making Flink a competitive computing framework for both streaming and batching.

Copyright Notice:The content of this article is contributed by Aliyun real-name registered users, and the copyright belongs to the original author. Aliyun developer community does not own the copyright and does not bear the corresponding legal liability. For specific rules, please refer to User Service Agreement of Alibaba Cloud Developer Community and Guidance on Intellectual Property Protection of Alibaba Cloud Developer Community. If you find any suspected plagiarism in the community, fill in the infringement complaint form to report, once verified, the community will immediately delete the suspected infringing content.