Backpressure is a very common problem in real-time computing application development, especially in streaming computing. Backpressure means that a node in the data pipeline becomes a bottleneck, and the processing rate cannot keep pace with the upstream data transmission rate. Because real-time computing applications typically use message queues to decouple production and consumer data sources, which are pull-based, backpressure is usually transmitted from a node to the data source and reduces the intake rate of the data source (such as Kafka Consumer).

There are many blogs on the Internet about Flink’s backpressure mechanism. Chinese blogs recommend these two articles. To put it simply, data between each node (Task) in Flink topology is transmitted in the form of blocking queues. When the queue is full due to the lack of consumption in the downstream, the upstream production will also be blocked, and finally the intake of data sources will be blocked. This paper will focus on the official blog [4] to share the author’s experience in analyzing and dealing with Flink backpressure in practice.

Influence of back pressure

Backpressure does not directly affect the availability of the job; it indicates that the job is in a sub-healthy state, has potential performance bottlenecks, and can lead to greater data processing delays. In general, backpressure may not be significant for applications with low latency requirements or small data volumes, whereas backpressure can cause serious problems for larger Flink operations.

This is because of Flink’s checkpoint mechanism, the back pressure will also affect two indicators: checkpoint duration and state size.

  • The former is because checkpoint barriers do not cross normal data, and blocking data processing also makes it take longer to pass through the entire data pipeline. Thus, the checkpoint total End to End Duration becomes longer.
  • The latter is because in order for EOS (exact-once-semantics) to be accurate, checkpoint barrier Alignment is required for operators with more than two input channels. After receiving the barrier of the faster input channel, the subsequent data will be cached and not processed until the barrier of the slower input channel also reaches the barrier. The cached data will be stored in the state, resulting in a larger checkpoint.
These two effects are dangerous for production environments because checkpoint duration is critical to ensure data consistency. A longer checkpoint duration may cause checkpoint timeout failure. State size can also slow checkpoint and even cause OOM (using heap-based StateBackend) or physical memory usage that exceeds container resources (using RocksDBStateBackend) stability issues.

As a result, we try to avoid backpressure in production. (Incidentally, we offer the Flip-76: Unaligned Checkpoints[4] to decouple backpressure from checkpoint in order to relieve the pressure caused by backpressure.)

Locate the reverse pressure node

To solve the back pressure, the first thing to do is to locate the node causing the back pressure, which has two main methods:

  1. Through the back pressure monitoring panel of Flink Web UI;
  2. Flink Task Metrics.
The former is easier to use and is suitable for simple analysis, while the latter provides more information and is suitable for monitoring systems. Because the back pressure will be transmitted upstream, both methods require us to investigate from Source node to Sink until we find the root cause of the back pressure [4]. The following two methods are introduced respectively.

Reverse pressure monitoring panel

Flink Web UI backpressure monitoring provides subtask-level backpressure monitoring. The principle is to determine whether the node is in backpressure state by sampling the stack information of the Task thread periodically and obtaining the frequency of the thread being blocked in the request Buffer (meaning blocked by the downstream queue). By default, the frequency below 0.1 is OK, 0.1 to 0.5 is LOW, and above 0.5 is HIGH.





Figure 1. Flink 1.8’s Web UI backpressure panel (from official blog)

If you are in a backpressure state, there are two possibilities:

  1. The node’s sending rate cannot keep up with its data generation rate. This typically happens with an input and output Operator (such as flatMap).
  2. The downstream node receives data at a slower rate and uses the backvoltage mechanism to limit the sending rate of the node.
If it is in the first condition, this node is the root node of backpressure, and it is the first node with backpressure from the Source Task to the Sink Task. In the second case, continue to check downstream nodes.

It is worth noting that the root node of backpressure does not necessarily show high backpressure in the backpressure panel, because the backpressure panel monitors the sending end. If a node is a performance bottleneck, it will not lead to high backpressure itself, but to high backpressure in its upstream. In general, if we find the first node with backpressure, the backpressure is either caused by this node or its immediate downstream node.

So how do you differentiate between these two states? Unfortunately, it is not possible to judge directly from the backpressure panel. We also need to use Metrics or other monitoring methods to determine the location. In addition, if the job has a large number of nodes or a large degree of parallelism, the pressure of the back pressure panel will be very heavy or even unavailable because the stack information of all tasks needs to be collected.

Task Metrics

Task Metrics, provided by Flink, is a better counter pressure monitor, but it also requires more background knowledge.

Let’s start with a brief review of the networking stack since Flink 1.5, which can be skipped for those familiar with it.

When a TaskManager transmits data, there are usually multiple channels between two subtasks on different TaskManagers based on the number of keys. These channels reuse the same TASKManager-level TCP connection. The subtask-level Buffer Pool of the receiving end is shared.

At the receiving end, each Channel is initially allocated a fixed number of Exclusive buffers, which are used to store the data it receives and then released by the Operator. The number of buffers available at the receiver of a Channel is called a Credit, which is periodically synchronized to the sender and used by the sender to determine how many buffers to send.

When a Channel’s Exclusive Buffer is full, Flink requests the remaining Floating Buffer from the Buffer Pool. These Floating buffers are standby buffers and go wherever they are needed. On the Channel sending side, all channels in a Subtask share the same Buffer Pool, so there is no distinction between Exclusive and Floating buffers.





Figure 2. Flink credit-based network

The Metrics we use to monitor the backvoltage are mainly related to the Buffer utilization rate at the Channel receiver. The most useful ones are the following:

InPoolUsage Usage of the receiver’s Buffer floatingBuffersUsage (above 1.9) Floating Buffer of the receiver ExclusiveBuffersUsage (above 1.9) Exclusive Buffer usage of the receiving end

InPoolUsage is equal to the sum of floatingBuffersUsage and exclusiveBuffersUsage.

The general idea of analyzing backpressure is as follows: If the Buffer usage of the sending end of a Subtask is very high, it indicates that it is limited by the downstream backpressure. If a Subtask has a high Buffer usage at the receiving end, it is transmitting back pressure upstream. In the case of back pressure, the seat can be marked according to the following table (picture is from the official website):





FIG. 3. Back pressure analysis table

There should be no doubt that both outPoolUsage and inPoolUsage are low or high indicating that the current Subtask is normal or being backpressured downstream, respectively. Interestingly, when outPoolUsage and inPoolUsage are different, this may be due to the intermediate state of backpressure conduction or indicate that the Subtask is the source of backpressure.

If the outPoolUsage of a Subtask is high, it is usually affected by downstream tasks, so you can check for the possibility that the Subtask itself is the source of backpressure. If a Subtask’s outPoolUsage is low but its inPoolUsage is high, it could be the source of backpressure. Since the backpressure is usually transmitted upstream, the outPoolUsage of some upstream subtasks is high, which can be used to determine further. It is worth noting that backpressure can sometimes be transient and of little consequence, such as a brief network delay from a Channel or a normal GC from a TaskManager, which we can leave alone.

For Flink 1.9 and above, in addition to the table above, We can also according to floatingBuffersUsage/exclusiveBuffersUsage and its upstream outPoolUsage Task to further analysis by a Subtask and its upstream Subtask data transmission.





Figure 4. Flink 1.9 Backpressure analysis table

In general, a high floatingBuffersUsage indicates that backpressure is being transmitted upstream, ‘exclusiveBuffersUsage’ indicates whether the backpressure is tilted (floatingBuffersUsage high, ‘exclusiveBuffersUsage low’ indicates whether the backpressure is tilted, Because a few channels take up most of the Floating Buffer).

So far, we have abundant means to locate the node at which the root of backpressure appears, but there is no way to find the specific reason. In addition, network-based metrics cannot locate specific operators, but only tasks. In particular, embarrassingly parallel (where all operators are put into a Task and therefore have only one node), back-pressing metrics is not useful.

Analyze the specific causes and treatment

Once the backpressure node is located, the way to analyze the cause is very similar to the way we analyze the performance bottleneck of a normal program, but perhaps a little simpler because we are looking at Task Threads.

In practice, in many cases, the back pressure is caused by data skew, which can be confirmed by the Records Sent and Record Received of each SubTask in Web UI. In addition, the State size of different subtasks in Checkpoint detail is also a useful indicator to analyze data skew.

In addition, the most common problem is probably the execution efficiency of user code (frequently blocked or performance issues). The most useful way to do this is to create a CPU profile for TaskManager, from which we can analyze whether the Task Thread has run to the full CPU core: If so, analyze which functions the CPU is mostly spent on. For example, we occasionally encounter user functions (ReDoS) stuck in Regex in our production environment. If not, it depends on where the Task Thread is blocked. It may be that the user function itself has some synchronous calls, or it may be a system activity such as checkpoint or GC that causes the system to temporarily pause.

Of course, the results of the performance analysis may be normal, but the job application resources are insufficient, resulting in backpressure, which usually requires extended parallelism. It is worth mentioning that in future releases Flink will provide the JVM’S CPU flame map directly on the web user interface [5], which will greatly simplify the analysis of performance bottlenecks.

In addition, TaskManager memory and GC issues can cause backpressure, including frequent Full GC or even disconnection due to incorrect memory in TaskManager JVM areas. It is recommended to observe GC problems by enabling the G1 garbage collector on TaskManager to optimize GC and adding -xx :+PrintGCDetails to print GC logs.

conclusion

Backpressure is a common problem in Flink application operations, which means not only performance bottlenecks but also instability. You can locate backpressure by using the Backpressure monitoring panel on the Web UI and Task Metric. The former is convenient for simple analysis, while the latter is suitable for in-depth mining. After locating the backpressure node, we can further analyze the specific reasons behind the backpressure by means of data distribution, CPU Profile and GC index log and carry out targeted optimization.







The original link

This article is the original content of the cloud habitat community, shall not be reproduced without permission.