The author | Nico Krube translator | wang qiang

In previous articles, we covered the workings of the Flink network stack at every level, from high-level abstractions to low-level details. As the second article in this series, this article takes the first one a step further and focuses on monitoring network-related metrics to identify the impact of factors such as back pressure or bottlenecks in throughput and latency. This article will briefly introduce the means of dealing with back pressure, and subsequent articles will further explore the topic of network stack fine-tuning. If you are not very familiar with the knowledge of the network stack, is highly recommended to read the first article of the principle of analytical | insight into Apache Flink network protocol stack “.

monitoring

The most important link in network monitoring may be monitoring the back pressure, which means that the system receives data at a rate higher than its processing speed [1]. This phenomenon will put pressure on the sender, and it may be caused by two things:

  • The receiver is slow.
This may be because the receiver itself is experiencing back pressure and cannot continue processing data at the same rate as the sender; It is also possible that the receiver is temporarily stuck due to garbage collection work, lack of system resources, or I/O bottlenecks.

  • The network channel is slow.
This may not be related to the receiver, we say that the sender is experiencing back pressure because the network bandwidth shared by all the subtasks running on the same machine may be in short supply. Note that there may be other network users besides Flink’s network stack, such as sources and sinks, distributed file systems (checkpoints, network-attached storage), logging, and metric monitoring. We before an article about capacity planning (www.ververica.com/blog/how-to)… More on that.

[1] if you are not familiar with the back pressure, don’t know its interaction with Flink, suggest that we (should) published in 2015, reading articles about back pressure (www.ververica.com/blog/how-fl)… .

When back pressure is present, it will travel upstream and eventually reach your source, slowing them down. This in itself is not a bad thing, it just indicates that you lack sufficient resources to handle the current load. But you may want to make some improvements to handle the higher load without using more resources. To do this you need to find (1) where the bottleneck is (which task/operator it is) and (2) the cause of the bottleneck. Flink provides two mechanisms for identifying bottlenecks:

  • Directly identified by Flink’s Web UI and its back pressure monitor
  • Indirectly identified by some network indicators.
Flink’s Web UI is probably the first choice for quick troubleshooting, but it has some drawbacks, which we’ll explain below. Flink’s network metrics, on the other hand, are better suited for continuous monitoring and inferring which bottlenecks are causing back pressures, as well as analyzing the nature of those bottlenecks. We will discuss these two sections in detail below. In both cases, you need to look at all the sources and sinks to find the source of the back pressure. The starting point of the investigation is generally the last operator to undergo back pressure; And this last operator is probably the source of the back pressure.

Back pressure monitor

The back pressure monitor is only exposed to Flink’s WebUI[2]. Because it is an active component that is triggered only on request, it is not currently available to users through monitoring metrics. The back pressure monitor samples all task threads running on TaskManager with Thread.getStackTrace() and counts the number of blocked tasks in the cache request. These tasks block either because they are unable to send these caches at the rate generated by the network buffer, or because downstream tasks process them too slowly to guarantee delivery rates. The back pressure monitor displays the ratio of blocked requests to total requests. Since some back pressures are considered normal/temporary, the monitor will display the following:

  • OK, ratio ≤ 0.10
  • LOW, 0.10 < ratio ≤ 0.5
  • HIGH, 0.5 < ratio ≤ 1
Although you can adjust parameters such as refresh interval, sample size, or latency between samples, you usually don’t need to adjust these parameters because the default values provide good enough results.





[2] you can also through the REST API access back pressure monitor: / jobs / : jobid/are / : vertexid/backpressure

The back pressure monitor can help you find out where the back pressure is coming from (in which task/operator). But you can’t use it to further infer the cause of the back pressure. In addition, for large jobs or high levels of parallelism, the information displayed by the back pressure monitor is too messy to analyze, and it may take time to fully collect the data from the TaskManager. Also note that the sampling effort may affect the performance of your current job.

Network indicators

The network metrics and task I/O metrics are lighter than the back pressure monitor and are constantly updated for each job currently running. We can use these indicators to get more information, and the information collected can be used for other purposes besides monitoring back pressure. The indicators most relevant to users are:

  • Flink 1.8 and earlier: outPoolUsage, inPoolUsage. They are estimates of the ratio of used to available caches in each local buffer pool. When parsing inPoolUsage in Flink 1.5-1.8 using credit-based flow control, note that it only relates to floating caches (exclusive caches do not count in buffer pools).
  • Flink 1.9 and later: outPoolUsage, inPoolUsage, floatingBuffersUsage, exclusiveBuffersUsage These are estimates of the ratio of used to available caches in each local buffer pool. Starting with Flink 1.9, inPoolUsage is the sum of floatingBuffersUsage and exclusiveBuffersUsage.
  • NumRecordsOut, numRecordsIn. Both metrics have two scopes: an operator and a subtask. Network monitoring uses the subtask scope metric and shows the total number of records it has sent/received. You may need to explore these numbers further to find out the number of records over a particular time span, or use the equivalent PerSecond metric.
  • NumBytesOut, numBytesInLocal, numBytesInRemote. Represents the total number of bytes sent or read by this subtask from a local/remote source. It can also be obtained by the PerSecond indicator.
  • NumBuffersOut, numBuffersInLocal, numBuffersInRemote. Similar to numBytes, but counting the number of network buffers.
Warning: For completeness, we’ll briefly cover the outputQueueLength and inputQueueLength metrics. They are somewhat like the [out, in] PoolUsage metrics, but they show the number of caches in the output queue of the sender subtask and the input queue of the receiver subtask, respectively. But it’s hard to infer the exact number of caches, and local channels have a subtle special problem: As the channel does not have its own local input queue (it directly using the output queue), so the channel of this value is 0 (see FLINK – 12576, issues.apache.org/jira/browse…). ; InputQueueLength = 0 in the case of only local input channels.
In general, we discourage the use of outputQueueLength and inputQueueLength because their resolution depends heavily on the current parallelism of the operators and the number of exclusive and floating cache configurations. In contrast, we recommend using various *PoolUsage metrics, which provide more detailed information to the user.

Note: If you want to extrapolate cache usage, remember the following: any outgoing channel that has been used at least once always occupies a cache (Flink 1.5 and later). Flink 1.8 and earlier: This cache (even empty!) Always count 1 in the backlog, so the receiver tries to reserve a floating cache for it. Flink 1.9 and above: Count from the backlog only when a cache is ready for consumption, such as when it is full or refreshed (see Flink -11082). The receiver releases the received cache only after deserializing the last record in it.
These indicators are combined later to understand the relationship between back pressure and resource utilization/efficiency and throughput. There will also be a separate section detailing delay-related metrics later.

Back pressure

There are two sets of metrics that can be used to monitor back pressure: (local) buffer pool utilization and input/output queue length. The granularity of these two indicators varies, but neither is comprehensive, and there are many theories about how to interpret them. Since queue length metrics are inherently difficult to interpret, we will focus on input and output pool utilization metrics, which also provide more detail.

  • If the outPoolUsage of a subtask is 100%, it is experiencing back pressure. Whether the subtask is blocked or is still writing records to the network buffer depends on whether the RecordWriter is currently writing to a full cache. This is not what the back pressure monitor shows!
  • When inPoolUsage is 100%, all floating caches are allocated to channels and back pressure is eventually passed upstream. These floating caches are in either of the following states: because an exclusive cache is being occupied (the remote input channel has been trying to maintain the credit of the #exclusive buffer), the floating caches are reserved for future use on the channel; They are kept for a sender backlog waiting for data; They may contain data and queue in the input channel; Or they may contain data that is being read (one record at a time) by the receiver’s subtasks.
  • Flink version 1.8 and earlier: according to Flink – 11082 (issues.apache.org/jira/browse…). Even under normal circumstances, 100% inPoolUsage is common.
  • Flink 1.9 and above: If inPoolUsage continues at around 100%, this is a strong sign of upstream back pressure.
The following table summarizes all combinations and their explanations. Keep in mind, however, that back pressure may be secondary or temporary (that is, not needed to be looked at), or may only occur on a particular channel, or may be caused by other JVM processes on a particular TaskManager (such as GC, synchronization, I/O, resource shortages, etc.), and not from a subtask.

OutPoolUsage lowoutPoolUsage highinPoolUsage Low Normal (Back pressure generated, current status: InPoolUsage High (Flink 1.9+) If the outPoolUsage of all upstream tasks is low, only note (backpressure may eventually occur); If the outPoolUsage of any upstream task becomes high, a problem (which may cause backpressure upstream, or may be the source of backpressure) problem (backpressure on downstream tasks or networks, which may be transmitted upstream)

We can even gain insight into the cause of back pressure by looking at the network indicators of the subtasks of two consecutive tasks:

  • If all the subtasks of the sink task have low inPoolUsage and any of the upstream subtasks have high outPoolUsage, then a network bottleneck may be causing the back pressure. Since the network is a resource shared by all of the TaskManager’s subtasks, the bottleneck may not come directly from this subtask, but from various concurrent operations, such as checkpoints, other flows, external connections, or other TaskManager/ processes on the same machine.
  • Back pressure can also be caused by all parallel instances of a task or by a single task instance.
The first is usually because the task is performing some time-consuming operation that applies to all input partitions; The latter is usually the result of some kind of bias, perhaps data skew or resource availability/allocation bias. The following section “How to Handle Back Pressure” describes what to do in both cases.

Flink 1.9 and above

  • If floatingBuffersUsage is less than 100%, there is unlikely to be back pressure. If it reaches 100% and all upstream tasks are under back pressure, the input is under back pressure on a single, partial, or full input channel. You can use ‘exclusiveBuffersUsage’ to distinguish between the three cases: Assuming that floatingBuffersUsage is close to 100%, the higher the ‘exclusiveBuffersUsage’, the higher the back pressure on the input channel. In the extreme case of exclusiveBuffersUsage approaching 100%, all channels are under back pressure.


  • The following table summarizes the relationship between exclusiveBuffersUsage, floatingBuffersUsage, and outPoolUsage of the upstream task. InPoolUsage = floatingBuffersUsage + exclusiveBuffersUsage:


ExclusiveBuffersUsage lowexclusiveBuffersUsage highfloatingBuffersUsage Low + All upstream outPoolUsage Low Normal [3]floatingBuffersUsage LOW + Any upstream outPoolUsage High problem (possibly network bottleneck) [3]floatingBuffersUsage High + all upstream outPoolUsage Low Caution (Only some of the input channels end up with backpressure) Caution (most or all of the input channels end up with backpressure.) floatingBuffersUsage High + Any upstream outPoolUsage High problem (only some input channels are under back pressure) Problem (most or all input channels are under back pressure)

[3] This should not be the case

Resource usage/Throughput

In addition to the individual uses of the metrics mentioned above, there are several combinations that can be used to explore the deep state of the network stack:

  • OutPoolUsage often approaches 100% when throughput is low, but at the same time inPoolUsage is low for all receivers, indicating that the round-trip time for our credit notifications (depending on your network latency) is too long for the default exclusive cache count to fully utilize your bandwidth. Consider increasing per-channel cache parameters or try disabling credit-based traffic control.
  • The combination numRecordsOut and numBytesOut can be used to determine the average size of serialized records to help you plan for peak scenarios.
  • To understand the impact of the cache fill rate and output flusher, consider the combination of numBytesInRemote and numBuffersInRemote. Tuning throughput (not latency!) , a lower cache fill rate may mean a lower network efficiency. Consider increasing the cache timeout in this case. Note that in Flink 1.8 and 1.9, numBuffersOut is increased only when the cache is near full or when an event deactivates a cache (such as a checkpoint barrier), and this action may lag. Also note that because caching is an optimization technique for remote channels and has limited impact on local channels, there is no need to look at cache fill rates on local channels.
  • You can also use a combination of numBytesInLocal and numBytesInRemote to distinguish between local and remote traffic, but in most cases this is not necessary.

How to handle back pressure?

Assuming you have identified the source of the back pressure, which is the bottleneck, the next step is to analyze why this is happening. Below we list some of the potential causes of back pressure, from basic to complex. We recommend examining the basic causes first, and then delving into more complex causes, otherwise some wrong conclusions may be drawn. Also recall that back pressure can be temporary, the result of load peaks, checkpoints, or data backlogs that need to be processed when a job restarts. If the back pressure is temporary, ignore it. Also keep in mind that the process of analyzing and resolving problems can be affected by the bottleneck itself. Having said that, there are a few things to check here.

System resources

First, you should check the basic resource usage of the controlled machine, such as CPU, network, or disk I/O. If some resources are occupied completely or in large quantities, you can perform the following operations:

  1. Try to optimize your code. Code profilers are useful here.
  2. Adjust the Flink for this resource.
  3. Expand resources by increasing parallelism and/or increasing the number of computers in the cluster.

Garbage collection

In general, long hours of garbage collection can cause performance problems. You can either print the GC debug log (via -xx: +PrintGCDetails) or use some memory /GC parser to verify that you are in this condition. Because the GC problem processing and application of highly relevant, and independent of the Flink, so we won’t be in the detail (refer to adjust the guide garbage collection, Oracle docs.oracle.com/javase/8/do… Or Plumbr’s Java garbage collection manual, Plumbr.io/java-Garbag…) .

CPU/ thread bottleneck

If the CPU bottleneck is coming from one or more threads, and the CPU usage of the entire machine is still relatively low, the CPU bottleneck may be difficult to detect. For example, a single CPU thread bottleneck on a 48-core computer would result in only 2% CPU utilization. Consider using code profilers, because they show CPU usage for each thread, so hotlines can be identified.

Thread contention

Similar to the CPU/ thread bottleneck problem above, high thread contention rates on shared resources can cause subtask bottlenecks. Still, pull out the CPU profiler and consider looking for synchronization overhead/lock contention in user code — although we should avoid adding synchronization to user code, which can be dangerous! Also consider investigating shared system resources. For example, the SSL implementation of the default JVM can fetch data from around the shared /dev/urandom resource.

Unbalanced loading

If your bottleneck is caused by data skewness, try changing the data partition to several independent heavy keys or implementing local/pre-aggregation to clean up the skewness or mitigate its impact.

There are many other cases. In general, to weaken a bottleneck and thus reduce back pressure, first analyze where it is occurring and then find out why. A good place to start is by examining which resources are being utilized.

Delay to track

Tracking delays for each possible link is a separate topic. In this section, we will focus on the wait time for records in the Flink network stack — including the system network connection. At low throughput, these delays are directly affected by the cache timeout parameter of the output flusher, or by the acceptance of any application code delays. When records take longer to process than expected or when timers fire simultaneously — and prevent receivers from processing incoming records — the wait time for subsequent records in the network stack can be significantly longer. We strongly recommend that you add your own metrics to Flink jobs to better track delays in job components and to get a more complete understanding of why delays occur.

Flink provides some support for tracking record delays through the system (outside of user code). This feature is disabled by default (see below for reasons!). , must use the metrics. Latency. The interval or ExecutionConfig # setLatencyTrackingInterval () is set in the configuration of the Flink delayed tracking intervals to enable this feature. Enabled, Flink according to metrics. The latency. The granularity of the definition of granularity generated histogram delay:

  • Single: Each operator subtask has a histogram
  • Operator (default) : Each combination of source and operator subtasks has a histogram
  • Subtask: Each combination of source subtask and operator subtask has a histogram (parallelism quadrupled!)
These metrics are collected through special “delay markers” : each source subtask periodically issues a special record containing its creation time stamp. The delay mark then flows with the normal record and does not exceed the normal record on the line or in the cache queue. However, the delay mark does not enter the application logic and will exceed the normal record there. Therefore, the delay tag only measures the wait time between user codes, not the complete end-to-end delay. But user code indirectly affects these wait times!

Because latencyMarkers are in the network buffer just like normal records, they also wait because the cache is full or flush because of a cache timeout. When the channel is under high load, the network buffer data does not add latency. However, as long as a channel is under load, records and latency markers will suffer maximum buffer_TIMEOUT /2 average latency. This delay is added to each network connection connected to the subtask and should be considered when analyzing the subtask latency indicator.

By looking at the latency tracker exposed to each subtask, such as at the 95th percentile, you should be able to identify which subtasks significantly affect source-to-sink latency and then optimize them accordingly.









The original link

This article is the original content of the cloud habitat community, shall not be reproduced without permission.