Summary: Explains some of the features of Flink 1.12 resource management, including memory management, resource scheduling, and extending the resource framework. This article was organized by Chen Zhengyu, a community volunteer, Song Xintong, an Apache Flink Committer and Alibaba Technical Expert, Apache Flink Contributor and Guo Yangze, a senior development engineer at Alibaba, and mainly introduces some features of Flink 1.12 resource management. The content is mainly divided into four parts:

Memory Management Resource Scheduling Extended Resource Framework for Future Planning

First, memory management

Let’s first review Flink’s memory model evolution. The new memory models introduced by Flink 1.10 and Flink 1.11 are shown on the left of the figure below. Although there are many modules involved, 80%-90% of users only need to focus on the four parts that are really used for Task execution: Task Heap Memory, Task Off-Heap Memory, Network Memory, and Manned Memory.

Most of the other modules are Flink’s frame memory, which does not normally need to be adjusted. If you encounter problems, you can also solve them through the community documentation. In addition, “how much memory does a job need to meet the actual production demand” is also a problem we have to face, such as the function use of other indicators, whether the job performance is affected by lack of memory, whether there is a waste of resources, etc.

In response, the community has provided a new Web UI for Task Manager and JobManager in Flink version 1.12.

In the new Web UI, each monitoring indicator configuration value and actual usage can be directly corresponding to the memory model for intuitive display. Based on this, you can have a better understanding of how the job is running, how to adjust it, and what configuration parameters to adjust it with (the community has documentation to support this). With the new Web UI, you can better understand how jobs are being used, and memory management is easier.

How to manage the Memory?

Flink managed memory is actually a Flink-specific type of local memory that is not managed by the JVM or GC and is managed by Flink itself.

There are two main features of local memory:

  • On the one hand, there is slot level budget planning, which can ensure that some operators or tasks cannot be run due to insufficient memory during the operation of the job; It won’t waste resources because it reserves too much memory that is not used. At the same time, Flink can ensure that the memory is released accurately when the Task ends and that enough memory is available when the Task Manager executes the new Task.
  • On the other hand, resource adaptability is also one of the most important characteristics of managed memory, which means that the memory requirements of operators are dynamically adjustable. With adaptability, the operator will not waste resources because of giving too much memory to the task, nor will it cause the whole job to be unable to run because of providing relatively little memory, so that the use of memory is kept within a certain reasonable range.

Of course, in the case of relatively small memory allocations, there are certain limitations to the operation, such as the need to keep the job running through frequent disk drops, which may affect performance.

Currently, for managed memory, Flink is used in the following scenarios:

  • RocksDB State backend: In a streaming scenario, each Slot uses the State Operator and thus shares the same underlying RocksDB cache;
  • Flink built-in operators: including batch processing, Table SQL, DataSet API and other operators, each operator has an independent resource budget, will not be shared with each other;
  • Python processes: Users using Pyflink need to start a Python virtual machine process to define UDFs in the Python language.

2. Job Graph compilation phase

Flink’s management of memory is mainly divided into two stages.

2.1 Job Graph compilation phase of the Job

Three issues need to be noted at this stage:

The first question is which operators or tasks in slot are executed simultaneously. The question relates to how memory is planned in a query job and whether there are other tasks that need to use Management Memory to save memory. In a streaming job, this problem is relatively simple, because we need all operators to be executed at the same time to ensure that the data produced by the upstream can be consumed by the downstream in a timely manner, so that the data can flow through the whole job grep. But if we’re in some scenario of batch processing, we actually have two modes of data shuffle,

One is the pipeline mode, which is the same as the streaming mode, that is, the processing mode we mentioned above, requires the operation of upstream and downstream operators simultaneously, with the upstream producing at any time and the downstream consuming at any time.

Another method is so-called batch blocking, which requires the upstream to produce all the data, and the downstream can only start reading the data after the end of the recording.

These two modes affect which tasks can be performed simultaneously. Currently in Flink, based on the type of an edge in the job topology (see figure). We define a concept called pipelined region, which is a subgraph connected by the edge lock of the pipeline. We identify this subgraph and use it to determine which tasks are executed simultaneously.

The second question is: what are the usage scenarios for slot? We have just described three usage scenarios for Manage Memory. At this stage, a Python UDF and Stateful Operator may appear for a streaming job. The important thing to note at this stage is that there is no guarantee that the State Operator will use management memory, as this depends on its State type.

If it uses the RocksDB State Operator, it needs to use Manage Memory; However, it does not need to if it uses the Heap State Backend. However, the job does not actually know the type of state at compile time, which is something to be aware of.

Third question: In addition to the usage scenarios for the Batch job, we need to be aware of the previously mentioned operator for the Batch job. It uses management memory in an operator-exclusive way, rather than sharing slots. We need to know how much memory should be allocated by different operators, which is currently set automatically by Flink’s planning job.

2.2 Execution phase

The first step is to determine if there is a RockSDB based on the type of the State Backend. As shown in the figure above, for example, a slot has three operators ABC, B and C use Python, and C also uses a Stateful Operator. In this case, if we are in the heap situation, we go up to the branch, and only one of the whole slot is in use, and that’s Python. Then there are two ways to use it:

One of them is the RocksDB State Backend. After the first step is determined, the second step is to determine how to share the management memory of slot between different usage modes according to the user’s configuration. In the case of Steaming, we define Python as having a weight of 30% and State Backend as having a weight of 70%. In such a case, if only Python, the Python part is naturally using 100% memory (Streaming’s Heap State Backend branch);

In the second case (the Streaming RocksDB State Backend branch), the two operators of B and C share 30% of their memory for Python UDFs. In addition, C reserves 70% of the memory exclusively for the RocksDB State Backend. Finally, Flink determines how much memory each operator can actually use, based on the Task Manager’s resource configuration and how much Manager memory is in a slot.

The batch case differs from the stream case in two ways. First, there is no need to determine the type of State Backend, which is a simplification. Secondly, for the Batch Operator, as mentioned above, each Operator has its own budget of unique resources. In this case, we will calculate how many Shared resources are needed in different usage scenarios according to utilization, and then further subdivide the proportion to each Operator.

3. Parameter configuration



The diagram above shows that what we need is a Manager. There are two ways to configure the memory size:

  • One is the absolute value configuration,
  • Another configuration is as a relative value of the Task Manager’s total memory.

Taskmanager. Memory. Managed. Consumer – weight is a new configuration items, its data type is the type of the map, that is to say, we in this is, in fact, given a key colon value, And then the comma plus the next set of key colon values is the structure of such a data. There are currently two consumer keys that we support:

  • One is the dataProc, which contains memory for both the State Backend in stream processing and the Batch Operator in Batch processing.
  • The other is Python.

Second, resource scheduling

The Feature related to partial resource scheduling is the one that people ask more about in other versions or mailing lists. Here we also do the corresponding introduction.

1. Maximum number of slots

Flink supported a limit on the maximum number of slots (slotManager.numb-of-slots.max) in 1.12. As mentioned earlier, for streaming jobs we required all operators to run simultaneously in order to keep the data running smoothly. In this case, the degree of concurrency of the job determines how many slots and resources our task needs to execute the job.

However, this is not the case for batch processing. Batch jobs can often have a large degree of concurrency, but actually do not need so many resources. Batch processing uses few resources, and after running ahead of the task, it will free up slots for subsequent tasks to use. Performing tasks in this serial manner avoids the excessive use of resources in the YARN/ K8S cluster. This parameter is currently supported in YARN/MESOS/Native K8.

2. The TaskManager fault tolerance

In our actual production, there may be program error, network jitter, hardware failure and other problems resulting in the TaskManager can not connect, or even directly hung up. TaskManagerLost is a common error in the log. In this case, a job restart is required, during which resources need to be reapplied and the TaskManager process restarted, which is very costly in terms of performance.

For jobs with high stability requirements, Flink1.12 provides a new feature that allows you to always have a small amount of redundant taskmanagers in the Flink cluster. These redundant taskmanagers can be used to quickly recover from single points of failure. Instead of having to wait for a new resource application process.

Redundant TaskManager is implemented by configuring SlotManager.redundant -TaskManager-num. By redundant TaskManager I mean that there are not exactly two TaskManagers running under empty load, but that there are two more TaskManagers than the total number of resources I need.

Tasks may be relatively evenly distributed, enabling a relatively good load to be achieved while utilizing idle TaskManager. In the event of a failure, the task can be quickly dispatched to the existing TaskManager, and then a new resource request can be made. This parameter is currently supported in YARN/MESOS/Native K8.

3. Tile the tasks

The task tiling problem mainly occurs in Flink Standalone mode or in older versions of K8S mode deployments. In this mode, due to the predefined number of TaskManagers and the number of slots in each TaskManager, uneven scheduling often occurs. Some managers may place tasks very full, while others may place tasks loosely.

In the 1.11 release, the parameter cluster.evenly spread-outs-slots was introduced, which enables it to be controlled for a relatively balanced scheduling.

Note:

First, we only use this parameter for Standalone mode, because in YARN and K8S mode, how many task managers are actually set up according to the requirements of your job, so it is necessary to have the requirements before the task manager. Instead of having the task manager first and then the slot scheduling requirements.

In fact, when scheduling tasks, you can only see the current TaskManager registered. Flink has no global idea of how many more taskmanagers will be registered. This is a question many people ask. That’s why the feature doesn’t seem to work as well once it’s turned on, that’s the first thing.

The second point to note here is that we can only determine how many slots are free on each TaskManager, but not how many concurrency each operator has. Flink cannot determine whether each operator is evenly distributed across the TaskManager, because in Flink’s resource scheduling logic, tasks are completely invisible at the allocation layer of the whole slot.

Third, expand the resource framework

1. The background

In recent years, with the continuous development of artificial intelligence, deep learning models have been applied to a variety of production requirements, typical scenarios such as recommendation system, advertising push, and intelligent risk control. These are also the scenarios that Flink has been using a lot, so supporting artificial intelligence has been a long-term goal of the Flink community. There has been a lot of work on third-party open source extensions to this goal. The work opened by Alibaba mainly has two:

  • One project is Flink AI Extended, a deep learning extension framework based on Flink that currently supports the integration of frameworks such as TensorFlow, PyTorch, etc. It allows users to use TensorFlow as an operator in Flink tasks.
  • The other is Alink, a general-purpose algorithm platform based on Flink, which also has many common machine learning algorithms built into it.

Both of these efforts are functional extensions to Flink, but from a computational perspective, deep learning models, or machine learning algorithms, are often the computational bottleneck for the entire task. The GPU is a widely used resource in this field to speed up training or prediction. Therefore, supporting GPU resources to accelerate computing is an essential feature of Flink’s evolution in the AI field.

2. Use extended resources

At present, the only resource dimensions that Flink supports users to configure are CPU and memory. However, in practice, not only the GPU, we will also encounter other resource requirements, such as SSD or RDMA and other network acceleration devices. Therefore, we want to provide a general extended resource framework, any extended resource can be added to this framework as a plug-in, and the GPU is just one of the extended resources.

For the use of extended resources, two general requirements can be abstracted:

  • You need to support the configuration and scheduling of this class of extended resources. Users can specify requirements for such extended resources in the configuration, such as a GPU card per TaskManager, and forward requirements for extended resources when Flink is deployed in a repository such as Kubernetes/ YARN. To ensure that there are corresponding extended resources in the Container/Pod applied to.
  • You need to provide the operator with extended resource information at run time. In the case of a GPU, the operator needs to know which GPU card its internal model can be deployed on. Therefore, this information needs to be provided to the operator.

3. Expand the resource framework usage method

Using the Resource Framework we can divide it into the following three steps:

  • First, set the relevant configuration for the extended resource;
  • Then prepare the plug-ins in the extension resource framework for the required extension resources;
  • Finally, in the operator, the information about the extended resources is retrieved from the runtimeContext and used

3.1 Configuration parameters

# Define the number of GPUs required per TaskManager external-resource.gpu.amount: 1 # Defines the configuration key external-resource-.gpu.yarn.config-key for extended resources in Yarn or Kubernetes: Yarn. IO/gpu external - resource. Gpu. Kubernetes. Config - key: nvidia.com/gpu # define plug-in GPUDriver factory class. external-resource.gpu.driver-factory.class: org.apache.flink.externalresource.gpu.GPUDriverFactory

The above is an example of a configuration using GPU resources:

  • For any extended resource, the user will first need to add its name to the “external-resources”, which will also be used as a prefix for any other configuration associated with the extended resource. In our example, we define a resource called “GPU”.
  • In the scheduling layer, users are now supported to configure extended resource requirements at the granularity of TaskManager. In our example, we define the number of GPU devices on each TaskManager to be 1.
  • When deploying Flink to Kubernetes or YARN, we need to configure the configuration keys for the extended resources on the corresponding resource dock so that Flink can forward resource requirements. The GPU configuration is shown in the example.
  • If the plug-in is provided, you need to put the factory class name of the plug-in in the configuration.

3.2 Pre-preparation

Before actually using extended resources, some preparatory work should be done. Take GPU as an example:

  • In Standalone mode, the cluster administrator needs to ensure that GPU resources are visible to the TaskManager process.
  • In Kubernetes mode, the cluster support Device Plugin[6] is required, and the corresponding Kubernetes version is 1.10, and the GPU corresponding plug-in is installed in the cluster.
  • In YARN mode, GPU scheduling requires clustered Hadoop versions above 2.10 or 3.1, and proper configuration of files such as resource-types.xml.

3.3 Extending the resource framework plug-in

Once the extended resource is scheduled, the user-defined operator may also need information about the extended resource at run time before it can be used. The plug-in in the Extensible Resource Framework is responsible for retrieving this information. Its interface is as follows:

Public interface ExternalResourceDriverFactory {/ * * * provided according to the setup to create Driver * / ExternalResourceDriver extension resources createExternalResourceDriver(Configuration config) throws Exception; } public Interface ExternalResourceRiver {/** * ExternalResource */ Set<? extends ExternalResourceInfo> retrieveResourceInfo(long amount) throws Exception; }

ExternalResourceRiver will start on each TaskManager and the Extended Resource Framework will call the RetrieveSourceInfo interface of each Driver to get the extended resource information on the TaskManager. The resulting information is passed to the runtimeContext of the operator. ExternalResourceDriverFactory are factory class for the plug-in.

4. The GPU plug-in

Flink currently has a built-in plugin for GPU resources, which internally executes a Script called Discovery Script to retrieve the GPU information available for the current environment, which currently includes the Index of GPU devices.

Flink provides a default Script, located in the “plugins/external-resource-gpu/” directory of the project. Users can also implement custom Discovery scripts and configure them to use custom scripts. The protocol between the script and the GPU plug-in is:

  • When the script is invoked, the number of GPUs required is entered as the first parameter, followed by a user-defined parameter list.
  • If the script executes correctly, a list of GPU indexes, separated by commas, is printed.
  • If the script fails or the execution results are not as expected, the script exits with a non-zero value, which causes the TaskManager to fail to initialize and the error message of the script is printed in the log.

The default script provided by Flink is to use the “nvidia-smi” tool to get the number of GPUs available on the current machine, along with the index, and return a list of the corresponding number of GPUs based on the number of GPUs needed. When the required number of GPUs cannot be acquired, the script exits with a non-zero value.

The resources of GPU devices are divided into two dimensions: stream processor and video memory. The video memory resources only support exclusive use. Therefore, when more than one TaskManager is running on the same machine, a GPU used by more than one process may result in OOM. Therefore, in Standalone mode, a TaskManager-level resource isolation mechanism is required.

The default script provides Coordination Mode to support GPU resource isolation between multiple TaskManager processes on a single machine. This mode realizes the synchronization of GPU usage information among multiple processes by using file locks, and coordinates the use of GPU resources by multiple TaskManager processes on the same machine.

5. Obtain extended resource information in the operator

In a user-defined operator, you can use the resource name defined in “external-resources” to call the GetExternalResourceInfos interface of the RuntimeContext to get information about the extended resource. In the case of a GPU, each resulting externalResourceInfo represents a GPU card, and the field containing the name “index” represents the device index of that GPU card.

public class ExternalResourceMapFunction extends RichMapFunction<String, String> { private static finalRESOURCE_NAME="gpu"; @Override public String map(String value) { Set<ExternalResourceInfo> gpuInfos = getRuntimeContext().getExternalResourceInfos(RESOURCE_NAME); List<String> indexes = gpuInfos.stream() .map(gpuInfo -> gpuInfo.getProperty("index").get()).collect(Collectors.toList()); // Map function with GPU// ... }}

6. MNIST Demo

The following figure illustrates the use of GPU to accelerate the Flink job with the identification task of MNIST dataset.

MNIST is a handwritten digital picture data set, as shown in the figure above. Each picture can be represented as a matrix of 28*28. In this task, we used the pre-trained DNN model, the image input through a layer of fully connected network to get a 10-dimensional vector, the subscript of the largest element of the vector is the recognition result.

We started a Standalone cluster with two TaskManager processes on an ECS with two GPU cards. With the Coordination Mode functionality provided by the default script, we can ensure that each TaskManager uses one of the GPU cards.

The core operator of this task is the image recognition function MNISTClassifier, and the core implementation is shown as follows

class MNISTClassifier extends RichMapFunction<List<Float>, Integer> {@Override public void open(Configuration parameters) {// Select the first GPU Set< externalResourceInfo > externalResourceInfos = getRuntimeContext().getExternalResourceInfos(resourceName); final Optional<String> firstIndexOptional = externalResourceInfos.iterator().next().getProperty("index"); // Initialize the JCUDA.cudasetDevice (Integer. ParseInt (FirstIndexOption.get ())) component with the index of the first GPU; JCublas.cublasInit(); }}

In the Open method, get the GPU currently available in TaskManager from RuntimeContext and select the first block to initialize JCUDA and the JCublas library.

class MNISTClassifier extends RichMapFunction<List<Float>, Integer> {@Override public Integer Map (List<Float> value) {// jCublas.CublassGEMV ('n', Dimensions. F1,); Dimensions. F0, 1.0f, Matrixpointer, Dimensions. F1, InputPointer, 1, 0.0f, OutputPointer, 1); JCublas.CublasGetVector (DIMENSIONS. F1, Sizeof.FLOAT, outputPoint, 1, Point.to (output), 1); JCublas.cublasFree(inputPointer); JCublas.cublasFree(outputPointer); int result = 0; for (int i = 0; i < DIMENSIONS.f1; ++i) { result = output[i] > output[result] ? i : result; } return result; }}

In the MAP method, the pre-trained model parameters and input matrix are put into the GPU video memory, and JCublas is used to perform matrix multiplication in the GPU. Finally, the result vector is extracted from the GPU video memory and the identification result number is obtained.

You can watch the video or check out the link on GitHub to get your hands on the process.

IV. Future plans

In addition to the released features described above, the Apache Flink community is working on additional resource management optimizations that will be available in future releases.

  • Passive resource scheduling mode: Manages memory allows Flink tasks to flexibly adapt to different TaskManager/Slot resources, making full use of available resources and providing the best computing power for computing tasks with given resource limits. However, the user still needs to specify the degree of parallelism of the computing task, and Flink needs to apply for a TaskManager/Slot that meets the number of parallelism degrees for smooth execution. Passive resource scheduling will enable Flink to dynamically change the degree of parallelism based on available resources, to process data with best effort when resources are scarce, and to restore the specified degree of parallelism to ensure processing performance when resources are sufficient.
  • Fine-grained resource management: Flink’s current slot-based resource management and scheduling mechanism assumes that all slots have the same specification. For some complex scale production tasks, it is often necessary to split the computing task into multiple subgraphs, each of which is executed with a Slot. When the resource requirements of subgraphs vary greatly, it is difficult to meet the resource efficiency requirements of Slot with the same specification, especially for extended resources with higher cost such as GPU. Fine-grained resource management allows users to specify resource requirements for subgraphs of jobs, and Flink uses different specifications of TaskManager/Slot to perform computational tasks based on the resource requirements to optimize resource efficiency.

Five, the summary

Through the introduction of the article, I believe that we have a clearer understanding of Flink memory management.

  • At first, the memory management and memory allocation details of each process are solved from local memory, Job Graph compilation stage and execution stage. The memory allocation of TaskManager is controlled by new parameter configuration.
  • Then, from the resource scheduling issues that we usually encounter, including the use of maximum Slot number, how to carry out TaskManager fault tolerance, and how to evenly distribute task resources through task tiling;
  • Finally, GPU is often used in the field of machine learning and deep learning to accelerate computing. By explaining how Flink uses the Extended Resource Framework in version 1.12 and demonstrating the use of resource extensions, we are shown the use of resource extensions. In terms of resource utilization, this paper proposes the future plans of the two communities, including passive resource model and fine-grained resource management.

This article is the original content of Aliyun, shall not be reproduced without permission.