This article is from Ververica

Overview of the Flink architecture

Flink Architecture Overview – Job

Users write Flink tasks using the DataStream API, DataSet API, SQL and Table API, which generate a JobGraph. The JobGraph is made up of operators such as Source, map(), keyBy(), window(), apply(), and sink. When JobGraph is submitted to the Flink cluster, it can run as a Local, Standalone, Yarn, and Kubernetes model.

Flink Architecture Overview -JobManager

JobManager provides the following functions:

  • Convert JobGraph to Execution Graph, and finally execute the Execution Graph
  • The Scheduler component is responsible for scheduling tasks
  • Checkpoint Coordinator The Checkpoint Coordinator component is responsible for coordinating the Checkpoint start and completion of tasks
  • Communicate with the TaskManager through the Actor System
  • Other functions, such as Recovery Metadata, can read data from Metadata for fault Recovery

Flink Architecture Overview – TaskManager

TaskManager is responsible for executing specific tasks and starts after JobManager applies for resources. The main components in TaskManager are:

  • Memory & I/O Menager is the management of Memory I/O
  • Network Manager, used to manage Network aspects
  • Actor system, which is responsible for network communication

The TaskManager is divided into a number of Taskslots, and each task runs in a TaskSlot, which is the smallest unit of scheduled resources.

Before introducing Yarn, introduce the Flink Standalone mode to better understand the Yarn and Kubernets architecture.

  • In Standalone mode, the Master and TaskManager can run on the same machine or on different machines
  • In the Mater process, the role of the Standalone ResourceManager is to manage resources. When the user submits JobGraph to the Master via the Flink CLuster Client, JobGraph goes through the Dispatcher first.
  • When the Dispatcher receives a request from the client, a JobManager is generated. The JobManager process then applies for resources from the Standalone ResourceManager, and finally starts the TaskManager.
  • After the TaskManager starts, there is a registration process, after which the JobManager sends specific tasks to the TaskManager for execution.

This is the Standalone task in action.

Flink runtime related components

Let’s summarize the basic architecture of Flink and some of its components at runtime, as follows:

  • Client: A user submits a task through SQL or API. After submitting the task, a JobGraph is generated.
  • JobManager: After receiving user requests, the JobManager schedules tasks and applies for resources to start TaskManager.
  • TaskManager: It is responsible for executing a specific Task. The TaskManager registers with the JobManager and executes the task when the TaskManager receives the task assigned by the JobManager.

Principles and practices of Flink on Yarn

Yarn Architecture – Overview

The Yarn mode is widely used in China. Most companies have used the Yarn mode in their production environments. This section describes the architecture and principles of Yarn. You need to understand the architecture and principles of Yarn to know how Flink works on Yarn.

The Yarn architecture is shown in the preceding figure. The most important role of Yarn is ResourceManager, which manages resources. The Client submits tasks to ResourceManager.

After a user submits a task on the Client, the task is sent to ResourceManager first. ResourceManager starts Container and then Application Master, that is, starts the Master node. After Mater is started, ResourceManager applies for resources again. After ResourceManager allocates resources to Application Master, The Application Master then schedules specific tasks to execute.

Yarn architecture principles – Components

Components in a Yarn cluster include:

  • ResourceManager (RM) : ResourceManager processes client requests, starts and monitors ApplicationMaster, monitors NodeManager, and allocates and schedules resources, including Scheduler and Application Manager.
  • ApplicationMaster(AM) : ApplicationMaster runs on a Slave. ApplicationMaster is responsible for data segmentation, resource application and allocation, task monitoring, and fault tolerance.
  • NodeManager (NM) : NodeManager runs on the SLave node and is used for single-point resource management, AM/RM communication, and status reporting.
  • Container: Container abstracts resources, including memory, CPU, disk, and network resources.

Yarn architecture principle – Interaction

This section uses MapReduce jobs on Yarn as an example to explain the interaction mechanism of the Yarn architecture.

  • First, users write MapReduce codes and submit tasks on the Client
  • After receiving requests from clients, ResourceManager allocates a Container to start the Application Master. And notifies NodeManager to start ApplicationMaster in this Container.
  • After ApplicationMaster is started, it sends registration requests to ResourceManager. Then ApplicationMster applies for resources from ResourceManager. Based on the resources obtained, communicate with the relevant NodeManager and ask it to start the program.
  • One or more NodeManagers start Map/Reduce tasks.
  • NodeManager continuously reports Map/Reduce Task status and progress to ApplicationMaster.
  • When all Map/Reduce tasks are complete, ApplicationMaster reports the Task completion to ResourceManager and deregisters itself.

Flink on Yarn-PerJob

In The PerJob mode of Flink on Yarn, a task is submitted each time and resources are released after the task is completed. After you understand the Yarn principle, the PerJob process is easy to understand as follows:

  • First, the Client submits the Yarn App, such as JobGraph or JARS.
  • Then Yarn’s ResourceManager applies for the first Container. This Container starts processes using ApplicationMaster. ApplicationMaster runs FLink, that is, Flink-YARN ResourceManager and JobManager.
  • Finally, flink-YARN ResourceManager applies for resources from Yarn ResourceManager. When the resource is allocated, the TaskManager is launched. After the TaskManager starts, the TaskManager registers with the flink-yarn ResourceManager. After the registration is successful, the JobManager assigns specific tasks to the TaskManager to execute.

Flink on Yarn-Session

In PerJob mode, all resources are released after a task is executed, including JobManager and TaskManager. Session mode is different. Its Dispatcher and ResourceManager can be reused. In Session mode, when the Dispatcher receives the request, it will start JobManager (A) and let JobManager (A) complete the start of TaskManager. JobManager (B) and the corresponding TaskManager are then started. Resources are not released after tasks A and B are completed. Session mode is also called multi-threaded mode. Resources exist and are not released. Multiple JobManagers share a Dispatcher and flink-YARN ResourceManager.

The reference scenarios of the Session mode and per-job mode are different. The Per Job mode is suitable for tasks that are insensitive to startup time and take a long time to run. The Seesion mode is suitable for tasks that run in a short time, usually batch tasks. If the Per Job mode is used to run short-duration tasks, resources need to be applied for frequently. After running tasks, resources need to be released, and resources need to be applied for again next time. Obviously, the frequent start and stop of tasks does not apply to the Per Job mode, but rather to the Session mode.

Yarn Mode Features

The Yarn mode has the following advantages:

  • Unified resource management and scheduling. Resources (such as memory, CPU, disks, and networks) of all nodes in a Yarn cluster are abstracted into Containers. When computing frameworks require resources to perform computing tasks, they need to apply for Containers from Resource Manager. YARN schedules resources and allocates containers based on specific policies. Yarn mode Multiple task scheduling policies can be used to improve cluster resource utilization. For example, FIFO Scheduler, Capacity Scheduler, and Fair Scheduler can set task priority.
  • Resource isolation: Yarn uses the lightweight resource isolation mechanism Cgroups to isolate resources from each other. If the amount of resources used by a Container exceeds the preset upper limit, the Container kills them.
  • Automatic Failover. For example, Yarn NodeManager monitoring and Yarn ApplicationManager abnormal restore.

The Yarn mode has many advantages, but also has many disadvantages, such as high o&M deployment cost and low flexibility.