1. Overall architecture of Flink Runtime

The overall architecture of Flink is shown in Figure 1. Flink can run in a variety of different environments. For example, it can run directly in a single-process, multi-threaded manner to provide debugging capabilities. It can run on the resource management system, such as Yarn or K8S, or in various cloud environments.

First, the overall architecture diagram of FLink

Figure 1. The overall architecture of Flink, in which different execution environments at the Runtime layer provide a unified distributed execution engine

For different execution environments, Flink provides a unified set of distributed job execution engine, namely Flink Runtime layer. Flink provides two apis, DataStream and DataSet, on top of the Runtime layer to write streaming and batch jobs, respectively, and a more advanced set of apis to simplify the writing of specific jobs. This paper mainly introduces the overall architecture of Flink Runtime layer.

The main architecture of the Flink Runtime layer is shown in Figure 2, which shows the basic structure of a Flink cluster. The whole architecture of Flink Runtime layer is mainly realized in Flip-6. On the whole, it adopts the standard master-slave structure. The part in the white circle on the left is master, which is responsible for managing resources and jobs in the whole cluster. The two TaskExecutors on the right are slaves, responsible for providing specific resources and actually performing the job.

Figure 2. Basic structure of Flink cluster.Flink Runtime layer adopts standard master-slave architecture

The Master component contains three components, namely Dispatcher, ResourceManager, and JobManager. The Dispatcher is responsible for receiving the user’s job and pulling up a new JobManager component for this newly submitted job. The ResourceManager components. ResourceManager manages resources. There is only one ResourceManager in the entire Flink cluster. JobManager is responsible for managing the execution of jobs, and there may be multiple jobs executing simultaneously in a Flink cluster, each with its own JobManager component. All three components are included in the AppMaster process.

Based on the above structure, when the user submits a job, the submission script will first start a client process responsible for the compilation and submission of the job. It will first compile the code written by the user into a JobGraph. During this process, it will also do some checking or optimization work. For example, the client determines which operators can be chained to the same Task, and then submits the resulting JobGraph to the cluster for execution. The Standalone Session mode is similar to the Standalone Session mode. AM starts in advance, and the client directly establishes a connection with Dispacher and submits the job. In per-job mode, AM does not start in advance. In this case, the Client applies for resources from resource management systems such as Yarn and K8S to start AM, and then submits a job to the Dispatcher in AM.

After a job is sent to the Dispatcher, the Dispatcher starts a JobManager component. Then the JobManager applies for resources from ResourceManager to start specific tasks in the job. Depending on the Session and per-job modes, TaskExecutor may or may not be started at this point. If the former is the case, resources registered with TaskExecutor are recorded in ResourceManager. You can directly select and allocate idle resources. Otherwise, the ResourceManager applies for resources from the external resource management system to start the TaskExecutor, and waits for the TaskExecutor to register the resources before allocating idle resources. Currently, TaskExecutor resources in Flink are described in terms of slots. A Slot can normally execute a specific Task, but in some cases it can execute multiple related tasks.

After ResourceManager selects an idle Slot, it notifies the CORRESPONDING TM to allocate the Slot to JobManagerXX. Then, TaskExecutor records the Slot and registers the Slot with JobManager. After the JobManager receives the Slot registered with the TaskExecutor, it can actually submit the Task.

When a TaskExecutor receives a Task submitted by JobManager, it starts a new thread to execute the Task. Once started, tasks perform pre-specified calculations and exchange data with each other through the data Shuffle module.

This is the basic process of executing a job at the Runtime layer. It can be seen that Flink supports two different modes, namely per-job mode and Session mode. As shown in the following figure, in per-job mode, the Flink cluster executes only one job. That is, each job has its own Dispatcher and ResourceManager components. In addition, in per-job mode, both AppMaster and TaskExecutor are requested on demand. Therefore, per-job mode is more suitable for large jobs with long execution times, which require high stability and are not sensitive to the time of applying for resources. Conversely, in Session mode, Flink prestarts the AppMaster and a set of TaskExecutors, and then executes multiple jobs over the lifetime of the cluster. It can be seen that the Session mode is more suitable for small scale and short execution time jobs.

Figure 3. Flink Runtime supports two modes of job execution

2. Resource management and job scheduling

In fact, job scheduling can be viewed as a process of matching resources and tasks. In Flink, resources are represented by slots, and each Slot can be used to execute a different Task. At the other end, the Task is the actual Task in the Job, which contains the user logic to be executed. The main purpose of scheduling is to find matching slots for tasks.

Logically, each Slot should have a vector that describes the amount of various resources it can provide, and each Task needs a corresponding description of the amount of various resources it needs. Prior to 1.9, However, Flink did not support fine-grained resource descriptions. Instead, it assumed that the resources provided by each Slot were the same as the resources required by tasks. Since 1.9, Flink has added support for fine-grained resource matching, but this is still a work in progress.

The basis of job scheduling is to provide resource management first, so let’s first look at the implementation of resource management in Flink. As mentioned above, resources in Flink are represented by slots on TaskExecutor. As shown in the following figure, a ResourceManager component named SlotManager maintains the Slot information and status of all TaskExecutors in the current cluster, for example, which Slot is in the TaskExecutor. For example, whether the Slot is idle. When JobManager applies for resources for a Task, ResourceManager may apply for resources to start a new TaskExecutor based on the per-job or Session mode. After TaskExecutor starts, it discovers and registers active ResourceManager through the service. The registration information contains information about all slots in the TaskExecutor. After ResourceManager receives registration information, SlotManager records Slot information. When JobManager applies for resources for a Task, SlotManager selects an idle Slot from a single check-in Slot to allocate resources according to certain rules. When the allocation is complete, as described in the previous section, the RM first sends an RPC to the TaskManager requesting that the selected Slot be assigned to a specific JobManager. If a TaskManager has not executed a Task of the JobManager, it needs to establish a connection to the corresponding JobManager and then send an RPC request to provide the Slot. In JobManager, all Task requests are cached in SlotPool. When a Slot is provided, SlotPool selects the request from the cache and terminates the request process.

Figure 4. Interaction among modules of resource management function in Flink

When a Task ends, either normally or abnormally, JobManager is notified of its end status and the Slot is marked as occupied on the TaskManager side. The JobManager caches corresponding slots to the SlotPool but does not release them immediately. Failover tasks can be scheduled back to the original TaskManager as soon as possible to speed up Failover. If a Slot cached in SlotPool is not used for a specified period of time, SlotPool releases the Slot. Corresponding to the Slot application process, SlotPool notifies TaskManager to release the Slot, and TaskExecutor notifies ResourceManager that the Slot has been released.

In addition to normal communication logic, scheduled heartbeat information exists between ResourceManager and TaskExecutor to synchronize Slot status. In distributed systems, message loss and corruption are inevitable. These problems introduce inconsistent states into distributed system components. Without timed messages, components cannot recover from these inconsistent messages. In addition, if components do not receive heartbeat messages from each other for a long time, the components of the other party are considered invalid and perform a Failover.

Based on Slot management, Flink can schedule tasks into responding slots. As mentioned above, Flink has not yet fully introduced fine-grained resource matching, and by default, each Slot can be assigned to one Task. However, this approach can lead to low resource utilization in some cases. As shown in Figure 5, if A, B, and C execute the computing logic in sequence, assigning separate slots to A, B, and C will result in low resource utilization. To solve this problem, Flink provides the mechanism of Share Slot. As shown in Figure 5, based on Share Slot, multiple tasks from different JobVertex can be deployed in each Slot, but tasks from the same JobVertex cannot be deployed. As shown in Figure 5, A maximum of one Task of A, B, or C can be deployed in each Slot, but one Task of A, B, and C can be deployed simultaneously. When a single Task occupies a small amount of resources, Share Slot can improve resource utilization. In addition, Share Slot provides an easy way to maintain load balancing.

Figure 5. Example of Flink Share Slot. Share Slot allows multiple tasks from different JobVertex to be deployed in each Slot

Based on the above Slot management and allocation logic, JobManager is responsible for maintaining the state of Task execution in the job. As mentioned above, the Client submits a JobGraph to the JobManager, which represents the logical structure of the job. JobManager expands concurrently based on JobGraph. This results in the key ExecutionGraph in JobManager. ExecutionGraph’s structure is shown in Figure 6. In contrast to JobGraph, ExecutionGraph employees create objects for each Task, intermediate results, and so on, to maintain the information and state of those entities.

Figure 6. JobGraph and ExecutionGraph in Flink. ExecutionGraph is the concurrent expansion of JobGraph, the core data structure in JobMaster.

A Flink Job contains multiple tasks, so another key question is in what order tasks are scheduled in Flink. As shown in Figure 7, Flink currently provides two basic scheduling logic, namely Eager scheduling and Lazy From Source. Eager schedule, as the name implies, requests resources to schedule all tasks when the job starts. This scheduling algorithm is mainly used to schedule stream jobs that may not terminate. Correspond to that. Lazy From Source starts From Source and schedules in topological order. Simply put, Lazy From Source schedules Source tasks with no upstream tasks and caches output data to memory or writes it to disk when those tasks are completed. Then, for subsequent tasks, Flink will schedule them when his precursor tasks have all been executed. These tasks do their own calculations from reading the output data from the upstream cache. This process continues until all tasks have been calculated.

Figure 7. Two basic scheduling strategies in Flink. The Eager schedule applies to stream jobs, while Lazy From Source applies to batch jobs

3. Error recovery

During the execution of Flink jobs, in addition to the normal execution process, there may be various types of errors due to environmental reasons. Overall, errors can fall into two broad categories: Task execution errors or Flink cluster Master errors. Because errors are inevitable, Flink needs to provide an automatic error recovery mechanism for retry in order to improve availability. Flink provides several different error recovery strategies for Task execution errors of the first type. As shown in Figure 8, the first strategy is restart-all, which simply restarts all tasks. For Flink flow tasks, Flink provides the Checkpoint mechanism. Therefore, after a task is restarted, it can be executed from the previous Checkpoint. Therefore, this approach is more suitable for flow operations. The second type of error recovery policy is restart-individual, which applies only when no data is transferred between tasks. In this case, we can simply restart the failed task.

Figure 8. Example of the restart-all error recovery policy. This policy directly restarts all tasks.

Figure 9. Example of the restart-individual error recovery policy. This policy applies only to jobs that do not require data transfer between tasks. For such jobs, you can restart only the faulty Task.

Flink batch jobs do not have Checkpoint mechanism. Therefore, restarting all tasks for jobs requiring data transfer will cause ab initio calculation, resulting in performance problems. To enhance Batch jobs, Flink introduced a new region-based Failover strategy in 1.9. In a Batch job of Flink, there are two data transmission modes between tasks. One is Pipeline mode, in which data is directly transmitted between upstream and downstream tasks through the network, so the upstream and downstream tasks need to run simultaneously. The other is a Blocking attempt, as described in the previous section. In this way, the upstream Task caches data first, so that the upstream and downstream tasks can execute separately. Based on these two types of transfers, Flink divides the Entire ExecutionGraph into subgraphs by calling the subgraphs of the Pipeline-transmitted tasks in the ExecutionGraph regions. Tasks in a Region must be restarted at the same time. However, tasks in different regions can be restarted separately because of Blocking edges on the Region border. Based on this approach, if an error occurs in a Task in a Region, two scenarios can be considered. As shown in Figure 8, if an error occurs due to the fault of the Task, you can restart tasks in the Region to which the Task belongs. After the tasks are restarted, they can directly pull the output from the cache of the upstream Region to continue calculation. On the other hand, if the error is caused by a problem in reading upstream results, such as network connection interruption or the TaskExecutor that caches upstream output data unexpectedly exits, you need to restart the upstream Region to generate the corresponding data again. In this case, if the data distribution mode of the upstream Region output is not deterministic (for example, KeyBy and Broadcast are deterministic, while Rebalance and Random are not, because each execution produces different distribution results), to ensure the correctness of the result, You also need to restart all downstream regions of the upstream Region.

Figure 10. Example 1 of the Region-based error recovery policy. If the fault is caused by the downstream task itself, restart only the Region corresponding to the downstream task.

Figure 11. Example 2 of the Region-based error recovery policy. If the fault is caused by an upstream failure, restart both the upstream and downstream regions. In fact, if the downstream output uses an uncertain data splitting method, you need to restart the downstream regions of all upstream regions at the same time to ensure data consistency.

In addition to the exceptions executed by the Task itself, another type of exception is the exception that occurs on the Master of the Flink cluster. Flink currently supports starting multiple masters as backups, and these masters can be selected via ZK to ensure that only one Master is running at any one time. If an exception occurs on the active Master, a backup Master can take over the coordination. To ensure that the Master can accurately maintain the state of the job, Flink currently uses the simplest implementation, restarting the entire job. In fact, since the job itself may still be running, there is some room for improvement.

This article is from Ververica