Introduction: This section describes some features of Flink 1.12 resource management, including memory management, resource scheduling, and extended resource framework.

This article is organized by Community volunteer Chen Zhengyu, shared by Apache Flink Committer and Alibaba technical expert Song Xindong, And Apache Flink Contributor and Alibaba senior development engineer Guo Yangze. It mainly introduces some features of Flink 1.12 resource management. The content is mainly divided into four parts:

  1. Memory management
  2. Resource scheduling
  3. Extended Resource Framework
  4. The future planning

Making the address https://github.com/apache/flink welcome to Flink thumb up to star ~

I. Memory management

First, we review Flink’s memory model changes. On the left are the new memory models introduced by Flink 1.10 and Flink 1.11, respectively. Although there are many modules involved, 80%-90% users only need to pay attention to four parts: Task Heap Memory, Task off-heap Memory, Network Memory, and Managed Memory, which are really used for Task execution.

Most of the other modules are Flink’s framework memory, which does not need to be adjusted normally. Even if problems are encountered, they can be solved through community documents. In addition, “how much memory does a job need to meet the actual production demand” is also an issue that people have to face, such as the use of other indicators, whether the job performance is affected due to the lack of memory, whether there is a waste of resources, etc.

In response, the community has provided a new Task Manager and Job Manager Web UI in Flink version 1.12.

On the new Web UI, the configuration value and actual usage of each monitoring indicator can be directly displayed in the memory model. Based on this, you can have a better understanding of how the job is running, how to adjust it, with which configuration parameters to adjust it, etc. (The community also has documentation to support this). With the new Web UI, you’ll have a better understanding of how jobs are being used, and memory management will be easier.

1. Managed Memory

Flink managed memory is actually a local type of memory unique to Flink that is not managed by the JVM or GC, but is managed by Flink itself.

Local memory features are mainly embodied in two aspects:

  • On the one hand, slot-level budget planning can ensure that some operators or tasks will not run due to insufficient memory during the operation of jobs. It also doesn’t waste resources by leaving too much memory unused. At the same time, Flink can ensure that the memory is released accurately when the Task is finished, and ensure that the Task Manager has enough memory available when executing new tasks.
  • On the other hand, resource adaptability is an important feature of managed memory, which means that the memory requirements of operators are dynamically adjustable. With adaptability, the operator will not waste resources by giving too much memory to tasks, nor will the whole job be unable to run due to relatively little memory provided, so that the utilization of memory is kept within a reasonable range.

    Of course, with relatively small memory allocations, the job is limited, such as the need to keep the job running with frequent flogging, which can affect performance.

Currently, for managed memory, Flink is used in the following scenarios:

  • RocksDB State back end: In the stream computing scenario, each Slot uses the State Operator to share the same underlying RocksDB cache.
  • Flink built-in operators: include batch processing, Table SQL, DataSet API and other operators. Each operator has an independent resource budget and will not share with each other.
  • Python process: The user uses PyFlink. To define UDFs in Python, the user needs to start the Python VM process.

2. Job Graph compilation phase

Flink’s management of management memory is mainly divided into two stages.

2.1 Job Graph compilation phase of jobs

There are three issues to pay attention to at this stage:

  • The first question is: which operators or tasks will be executed simultaneously in the slot. This issue relates to how memory is planned in a query job and whether there are other tasks that need to use management Memory to keep the corresponding memory out. In streaming jobs, this problem is relatively simple, because we need all operators to execute at the same time, to ensure that the upstream output data can be timely consumed by the downstream, so that the data can flow in the whole job grep. But if we’re in some scenarios of batch processing, we actually have two modes of shuffle,

    • The first model is the pipeline model, which is the same as the streaming model, namely the processing method of bounded Stream mentioned above. It also requires the upstream and downstream operators to run simultaneously. The upstream produces at any time and the downstream consumes at any time.

    • The other is the so-called batch blocking method, which requires the upstream to produce all the data, and after the end of the disk, the downstream can start to read the data.

    These two modes affect which tasks can be performed simultaneously. Currently in Flink, according to the type of an edge in the job topology (shown above). We have defined a concept called pipelined Region, which is a subgraph that is all connected by the edge locks of the pipeline. We have identified this subgraph to determine which tasks will execute simultaneously.

  • The second question is: what are the usage scenarios for slots? We’ve just described three different use scenarios for Manage Memory. At this stage, Python UDFs and Stateful operators might arise for streaming jobs. It is important to note at this stage that it is not certain that the State Operator will use management memory, as this is dependent on its State type.

    • If it uses the RocksDB State Operator, it uses Manage Memory.
    • However, if it uses Heap State Backend, it is not required.

    However, the job doesn’t really know the type of state at compile time, so here’s where to look.

  • The third problem: In addition to the use scenarios for batch assignments, we also need to be clear about the batch operator mentioned earlier. It uses Management Memory in an operator exclusive way, rather than sharing it on a slot basis. We need to know how much memory each operator should allocate, which is currently set automatically by Flink’s scheduled job.

2.2 Execution Phase

The first step is to determine whether RocksDB exists based on the type of State Backend. As shown in the figure above, for example, a slot had three operators ABC. Both B and C used Python, and C used the Stateful Operator. In this case, in the heap case, we go up to the branch, and only one of the entire slots is in use, which is Python. Then there are two ways to use it:

  • One of these is RocksDB State Backend. After the first step, the second step is to determine how to share the slot management memory between different uses according to the user configuration.

    In this Steaming example, we defined Python as 30% weight and State Backend as 70% weight. In this case, if only Python is available, the Python part naturally uses 100% memory (the Heap State Backend branch of Streaming);

  • In the second case (RocksDB State Backend branch of Streaming), the two operators of B and C share 30% of the memory for Python udFS. In addition, C can use 70% of the memory exclusively for RocksDB State Backend. Finally, Flink determines the actual amount of memory available to each operator based on the Task Manager’s resource configuration and how much Manager memory is in a slot.

The batch case differs from the flow case in two ways. First, it does not need to determine the type of State Backend, which is a simplification. Secondly, for batch operators, as mentioned above, each Operator has its own resource budget. In this case, we will figure out how many Shared operators are required for different usage scenarios according to usage, and further subdivide the proportion to each Operator.

3. Set parameters

Configuration parameters The default value note
The size of the taskmanager.memory.managed.size / Absolute size
The weight taskmanager.memory.managed.fraction 0.4 Relative size (occupies Flink) Ratio of total memory
taskmanager.memory.managed.consumer-weight DATAPROC:70,PYTHON:30 Allocate weights when multiple uses coexist

The diagram above shows that we need a manager. There are two ways to configure the memory size:

  • One is the absolute value configuration,
  • Another is to configure it as a relative value of Task Manager’s total memory.

Taskmanager. Memory. Managed. Consumer – weight is a new configuration items, its data type is the type of the map, that is to say, we in this is, in fact, given a key colon value, A data structure consisting of a comma followed by the next set of keys, colon and value. We currently support two consumer keys:

  • One is DATAPROC, which contains both the State Backend memory for stream processing and the Batch Operator for Batch processing.
  • The other is Python.

2. Resource scheduling

Some features related to resource scheduling are frequently asked about in other versions or mailing lists. Here we will also introduce them.

1. Maximum number of slots

In 1.12, Flink supports a limit on the maximum number of slots (slotmanager.number-of-slots.max). As mentioned earlier, for streaming jobs, we require all operators to be executed simultaneously to ensure smooth operation of data. In this case, the degree of concurrency of the job determines how many slots and resources our task needs to execute the job.

However, this is not the case for batch processing. Batch processing jobs often have a large degree of concurrency, but actually do not need so many resources. Batch processing uses very few resources and runs out of the previous tasks to free up slots for subsequent tasks. Tasks can be executed in serial mode to avoid excessive resource occupation of YARN/K8s cluster. Currently, this parameter can be used on YARN, MESOS, and Native K8.

2. The TaskManager fault tolerance

In our actual production, there may be some problems such as program error, network jitter, hardware failure, etc., which may cause TaskManager to fail to connect or even directly hang up. One of the most common errors we see in logs is a TaskManagerLost error. In this case, a job restart is required, and during the restart, resources need to be re-applied and the TaskManager process restarted, which can be very costly in terms of performance.

For relatively stable jobs, Flink1.12 provides a new feature that supports a small number of redundant TaskManagers in the Flink cluster. These redundant TaskManagers can be used to quickly recover from a single point of failure. Without having to wait for a new resource request process.

You can configure slotManager. redundant- TaskManager-num to implement redundant Task Managers. Redundant TaskManagers here do not mean that there are two taskManagers running with empty loads, but that there are two more TaskManagers than the total number of resources I need.

Tasks may be relatively evenly distributed, and a relatively good load can be achieved while taking advantage of the idle TaskManager. In case of failure, the task can be quickly dispatched to the existing surviving TaskManager, and then a new round of resource applications can be made. Currently, this parameter can be used on YARN, MESOS, and Native K8.

3. Task tiling distribution

Task tiling issues mainly arise in Flink Standalone mode or in older versions of k8S mode deployments. In this mode, the number of taskManagers and the number of slots on each TaskManager are defined in advance, which often leads to uneven scheduling. Some Managers may put too many tasks, while others put too many.

Instituted instituted instituted slots-slots was introduced in version 1.11 to control it for a more balanced schedule.

Note:

  • First, we only use this parameter in the Standalone mode, because in yarn and k8s modes, the actual number of Task Managers is determined based on the requirements of your job. Therefore, the task Manager is created after the requirement. Instead of having task Manager and slot scheduling requirements.

    Each time you schedule a task, you can actually only see the TaskManager that is currently registered. Flink has no way of knowing how many taskManagers will be registered globally. This is also a question that many people ask. That’s why features don’t seem to work as well when turned on, that’s the first thing.

  • The second point to note is that we can only determine the number of free slots on each TaskManager, but not the number of concurrent operators. Flink does not determine whether each operator is evenly distributed on TaskManager, because in Flink’s resource scheduling logic, tasks are completely invisible in the allocation layer of the entire slot.

Third, expand the resource framework

1. The background

In recent years, with the continuous development of artificial intelligence, deep learning model has been applied to a variety of production requirements, such as recommendation system, advertising push, and intelligent risk control. These are also scenarios where Flink has been widely used, so supporting AI has long been one of the long-term goals of the Flink community. There are already a number of third-party open source extensions for this purpose. The open source work by Alibaba mainly includes two aspects:

  • One is the project of Flink AI Extended, which is a deep learning extension framework based on Flink. Currently, it supports the integration of TensorFlow, PyTorch and other frameworks. It allows users to use TensorFlow as an operator in Flink tasks.
  • The other is Alink, which is a general algorithm platform based on Flink. It also has many commonly used machine learning algorithms built in.

Both of the above work are functional extensions of Flink. However, from the perspective of computing power, deep learning models or machine learning algorithms are usually the computational bottleneck of the whole task. Gpus are widely used in this area to speed up training or prediction. Therefore, support for GPU resources to accelerate computing is an essential feature of Flink’s development in the AI field.

2. Use extended resources

Currently, the only resource dimensions that Flink supports for user configuration are CPU and memory. In practice, we will encounter other resource requirements, not only for Gpus, but also for network acceleration devices such as SSDS or RDMA. Therefore, we hope to provide a general extension resource framework, any extension resources can be added to the framework in the form of plug-ins, GPU is just one of the extension resources.

For the use of extended resources, two common requirements can be abstracted:

  • Supports the configuration and scheduling of extended resources. Users can specify the requirements for the extended resources in the configuration. For example, each TaskManager requires a GPU card. When Flink is deployed on the resource base such as Kubernetes or Yarn, the user needs to forward the requirements for the extended resources. Ensure that the applied Container/Pod has corresponding extension resources.
  • Extended resource information needs to be provided to the operator at runtime. Users may need some runtime information in a custom operator to use extended resources. For example, an operator on a GPU needs to know on which GPU card its internal model can be deployed. Therefore, this information needs to be provided to the operator.

3. Extend the resource framework usage

There are three steps to using the resource framework:

  • First set up the configuration for the extended resource;
  • The plug-in in the extension resource framework is then prepared for the required extension resources;
  • Finally, in the operator, information about the extended resources is obtained from RuntimeContext and used

3.1 Setting Parameters

External-resources: gpu # Define the number of Gpus required for each TaskManager external-resource.gpu.amount: External-resource-gpu.yarn.config-key: 1 # Define the configuration key for Yarn or Kubernetes extension resources: external-resource-gpu.yarn.config-key: Yarn. IO/gpu external - resource. Gpu. Kubernetes. Config - key: nvidia.com/gpu # define plug-in GPUDriver factory class. external-resource.gpu.driver-factory.class: org.apache.flink.externalresource.gpu.GPUDriverFactory

The following is an example configuration using GPU resources:

  • For any extension resource, the user first needs to add its name to “external-resources”, which will also be used as a prefix for any other configuration related to the extension resource. In our example, we define a resource called “GPU”.
  • In the scheduling layer, users are currently supported to configure extended resource requirements at the granularity of TaskManager. In our example, we define the number of GPU devices on each TaskManager to be 1.
  • When Flink is deployed on Kubernetes or Yarn, you need to configure the configuration key on the corresponding resource base so that Flink can forward resource requirements. The example shows the GPU configuration.
  • If a plug-in is provided, you need to put the plug-in’s factory class name in the configuration.

3.2 Preparations

Before using the extended resources, you need to do some preparatory work. Take the GPU as an example:

  • In the Standalone mode, the cluster administrator must ensure that the GPU resources are visible to the TaskManager process.
  • In Kubernetes mode, the cluster needs to support the Device Plugin[6], the corresponding Kubernetes version is 1.10, and the cluster has the GPU plug-in installed.
  • In Yarn mode, GPU scheduling requires that the cluster Hadoop version is 2.10 or later and the resource-types. XML file is correctly configured.

3.3 Extending resource Framework Plug-ins

After scheduling an extended resource, a user-defined operator may also need information about the extended resource at runtime to use it. The plug-in in the Extension Resource framework is responsible for obtaining this information, and its interface is as follows:

Public interface ExternalResourceDriverFactory {/ * * * provided according to the setup to create Driver * / ExternalResourceDriver extension resources createExternalResourceDriver(Configuration config) throws Exception; } public interface ExternalResourceDriver {/** * Obtain the required amount of extended resource information */ Set<? extends ExternalResourceInfo> retrieveResourceInfo(long amount) throws Exception; }

ExternalResourceDriver will start on each TaskManager, and the extension resource framework will call the retrieveResourceInfo interface of each Driver to get the extension resource information on the TaskManager, The resulting information is passed to the RuntimeContext of the operator. ExternalResourceDriverFactory are factory class for the plug-in.

4. The GPU plug-in

At present, Flink has a built-in plug-in for GPU resources, which obtains available GPU information in the current environment by executing a Script named Discovery Script. Currently, the information includes the Index of the GPU device.

Flink provides a default Script, located in the project’s “plugins/external-resource-gpu/” directory. Users can also implement a custom Discovery Script and configure it to specify the use of a custom Script. The protocol between the script and the GPU plug-in is as follows:

  • When the script is invoked, the number of Gpus required is entered as the first parameter, followed by a user-defined parameter list.
  • If the script runs properly, the GPU Index list is displayed, separated by commas.
  • If the script fails or the execution result is not as expected, the script exits with a non-zero value, causing TaskManager initialization to fail, and the script error message to be printed in the log.

The default script provided by Flink is to use the “nvidia-SMI” tool to get the number of gpus available on the current machine and the index, and return the index list of the number of Gpus required. When the required number of Gpus cannot be obtained, the script exits with a non-zero value.

GPU resources are divided into two dimensions: stream processor and video memory. The video memory resources can only be exclusively used. Therefore, if multiple TaskManagers are running on the same machine and a GPU is used by multiple processes, it may result in an OOM memory. Therefore, in Standalone mode, a TaskManager-level resource isolation mechanism is required.

The default script provides Coordination Mode to support GPU resource isolation between multiple TaskManager processes ina single server. This mode synchronizes GPU usage information between multiple processes by using file locks to coordinate the use of GPU resources by multiple TaskManager processes on the same machine.

5. Obtain the extended resource information from the operator

In the user-defined operator, the resource name defined in “external-Resources” can be used to call the getExternalResourceInfos interface of RuntimeContext to obtain information about the corresponding extended resource. Take GPU as an example, each ExternalResourceInfo obtained represents a GPU card, and the field “index” in the ExternalResourceInfo represents the device index of the GPU card.

public class ExternalResourceMapFunction extends RichMapFunction<String, String> { private static finalRESOURCE_NAME="gpu"; @Override public String map(String value) { Set<ExternalResourceInfo> gpuInfos = getRuntimeContext().getExternalResourceInfos(RESOURCE_NAME); List<String> indexes = gpuInfos.stream() .map(gpuInfo -> gpuInfo.getProperty("index").get()).collect(Collectors.toList()); // Map function with GPU// ... }}

6. MNIST Demo

The following figure illustrates how to accelerate a Flink job using a GPU with recognition tasks for the MNIST dataset.

MNIST, as shown in the figure above, is a data set of handwritten digital images, and each image can be represented as a 28*28 matrix. In this task, we use the pre-trained DNN model, and the image input goes through a layer of fully connected network to get a 10-dimensional vector, and the subscript of the largest element of the vector is the recognition result.

We started a Standalone cluster with two TaskManager processes on an ECS with two GPU cards. With the Coordination Mode function provided by the default script, we can ensure that each TaskManager uses one of the GPU cards.

The core operator of this task is the image recognition function MNISTClassifier, and the core implementation is shown as follows

class MNISTClassifier extends RichMapFunction<List<Float>, Integer> {@override public void open(Configuration Parameters) {// Get the GPU information and select the first GPU Set<ExternalResourceInfo> externalResourceInfos = getRuntimeContext().getExternalResourceInfos(resourceName); final Optional<String> firstIndexOptional = externalResourceInfos.iterator().next().getProperty("index"); // Initialize jCUDa.cudasetDevice (integer.parseint (firstIndexOptional. Get ())); // Initialize jCUDA.cudasetDevice (integer.parseint (firstIndexOptional. JCublas.cublasInit(); }}

In the Open method, get the gpus currently available to TaskManager from RuntimeContext and select the first block to initialize JCuda and the JCublas library.

class MNISTClassifier extends RichMapFunction<List<Float>, Integer> {@override public Integer map(List<Float> value) {// jcublas.cublassgemv ('n', DIMENSIONS. DIMENSIONS. F0, 1.0f, matrixPointer, DIMENSIONS. F1, inputPointer, 1, 0.0f, outputPointer, 1); Jcublas.cublasgetvector (DIMENSIONS. F1, Sizeof.FLOAT, outputPointer, 1, Pointer. To (output), 1); JCublas.cublasFree(inputPointer); JCublas.cublasFree(outputPointer); int result = 0; for (int i = 0; i < DIMENSIONS.f1; ++i) { result = output[i] > output[result] ? i : result; } return result; }}

In Map method, pre-trained model parameters and input matrix are put into GPU video memory, matrix multiplication in GPU is performed by using JCublas, and finally the result vector is taken out from GPU video memory and the recognition result number is obtained.

Watch the video or check out the Github link for a demo of the process.

4. Future plans

In addition to the released features described above, the Apache Flink community is working on additional resource management enhancements that will be available in future releases.

  • Passive resource scheduling mode: Managed memory enables Flink tasks to flexibly adapt to different TaskManager/Slot resources, making full use of available resources, and providing optimal computing power for computing tasks under given resource limits. However, the user still needs to specify the degree of parallelism of computing tasks, and Flink needs to apply for TaskManager/Slot that meets this degree of parallelism in order to execute smoothly. Passive resource scheduling enables Flink to dynamically change the degree of parallelism according to available resources. When resources are insufficient, Flink can best effort data processing and restore the specified degree of parallelism to ensure processing performance when resources are sufficient.
  • Fine-grained resource management: Flink’s current slot-based resource management and scheduling mechanism assumes that all slots have the same specifications. For some complex scale production tasks, it is often necessary to split the computing task into multiple subgraphs, each of which is executed using a separate Slot. When the resource requirements of subgraphs differ greatly, slots of the same specifications are often difficult to meet the requirements of resource efficiency, especially for the expansion resources with high cost such as Gpus. Fine-grained resource management allows users to specify resource requirements for subgraphs of jobs, and Flink uses different specifications of TaskManagers/slots to perform computing tasks based on resource requirements to optimize resource efficiency.

Five, the summary

Through the introduction of this article, I believe you have a clearer understanding of Flink memory management.

  • Firstly, the memory management and memory allocation details of each process are solved from the local memory, Job Graph compilation stage and execution stage, and the memory allocation of TaskManager is controlled by the new parameter configuration.
  • Then we often encounter resource scheduling related problems, including the maximum number of slots used, how to carry out fault tolerance in TaskManager, how to evenly distribute task resources through task tiling;
  • Finally, GPU is often used for accelerated computing in the field of machine learning and deep learning. By explaining how Flink uses the extended resource framework and Demo in version 1.12, it shows us the use of resource extension. Then it presents two future plans that the community is doing in terms of resource utilization, including passive resource patterns and fine-grained resource management.

Sixth, the appendix

[1] Accelerating your workload with GPU and other external resources

[2] Extend the Resource Framework document

[3] FLIP-108: Add GPU support in Flink

[4] flink – mnist project

Copyright Notice:The content of this article is voluntarily contributed by real-name registered users of Ali Cloud, and the copyright belongs to the original author. Ali Cloud developer community does not own the copyright, and does not bear the corresponding legal responsibility. For specific rules, please refer to User Service Agreement of Alibaba Cloud Developer Community and Guidance on Intellectual Property Protection of Alibaba Cloud Developer Community. If you find any suspected plagiarism in the community, fill in the infringement complaint form to report, once verified, the community will immediately delete the suspected infringing content.