Point attention, don’t get lost; Continue to update Java related technologies


To thank my friends for supporting me! Sorted out a Java advanced architecture data, Spring source analysis, Dubbo, Redis, Netty, Zookeeper, Spring Cloud, search public number [Java Ploughman] attention to receive

Sherlock.IO is eBay’s existing monitoring platform, handling tens of billions of logs, events and metrics every day. Flink Streaming Job The Flink Streaming Job is used to process the logs and events.



Image by Pexels



This paper will combine the current situation of Flink monitoring system, specifically describe the practice and application of Flink in the monitoring system, hoping to give some reference and inspiration to the staff in the same industry.



Monitor the status of Flink system



EBay’s monitoring platform Sherlock.IO handles tens of billions of logs, events, and metrics every day.



By constructing the Flink Streaming Job real-time processing system, the monitoring team can feed back the processing results of logs and events to users in time.



Currently, the monitoring team maintains eight Flink clusters, with the largest cluster size reaching thousands of TaskManagers, running hundreds of jobs, some of which have been running steadily for more than six months.


Metadata driven



To make it easier for users and administrators to create Flink jobs and adjust parameters, the monitoring team built a metadata service on Flink.



The service can describe the DAG of a job in Json, and the same DAG shares the same job, making it easier to create jobs without calling the Flink API.



The overall architecture of Sherlock.IO stream processing is shown in Figure 1:

Figure 1: Overall architecture of Sherlock.IO stream processing



Currently, jobs created using this metadata microservice only support Kafka as a data source. Once the data is connected to Kafka, users can define a Capability to process the logic for processing the data through Flink Streaming.



Metadata microservices



Metadata microservice framework is shown in Figure 2. At the top is Restful API provided by metadata microservice. Users can describe and submit jobs by calling THE API.

Figure 2: Metadata microservices framework



The metadata describing the job consists of three parts:

  • Capability

  • Policy

  • Resource



The Flink Streaming API is connected to the Metadata microservice API by the Flink Adaptor, which shields the Flink Stream API by calling the Flink Streaming API to create jobs based on the jobs described by the metadata microservice.



Therefore, users can create Flink jobs without knowing the Flink Streaming API. If you need to migrate to another flow processing framework in the future, you can migrate existing jobs to the new flow processing framework by simply adding an adapter.


1) Capability



A Capability defines the DAG for a job and the Class used by each Operator. Figure 3 shows the eventProcess Capability, which ultimately produces the DAG shown in Figure 4:

Figure 3: eventESSink Capability

Figure 4: Generated Flink jobs



The Capability reads data from Kafka and then writes it to Elasticsearch.



This Capability will be the job named “eventProcess”, and defines its parallelism as “5”, its operator is “EventEsIndexSinkCapability”, the data flow “from the Source to Sink.


(2) the Policy



Each Namespace defines one or more policies, and each Policy specifies a Capability, that is, which DAG is used to run the Policy.


Policy also defines the configuration of the job, such as which Kafka topic to read from, which Index to write to for ElasticSearch, whether to skip some operators, etc.


Second, a Policy can act as a simple Filter that can be configured with Jexl expressions to Filter out unwanted data, increasing the throughput of a job.


In addition, we have implemented the periodic update mechanism of Zookeeper, which makes it no longer necessary to restart the job after the Policy is changed. As long as the Policy is changed within the update interval, the Policy of the namespace will be automatically applied to the job.


Figure 5 is an example of a Policy with a paAS namespace:

Figure 5: Paas alertESSink Policy


(3) the Resource



Resource defines the resources required by a namespace, such as Flink clusters, Kafka Brokers, ES clusters, and so on.


We have multiple Flink clusters and ES clusters, and through the Resource configuration, the job can know which ES cluster the logs of a namespace should be written to, and can determine which Kafka cluster the data of that namespace should be read from.


Share the work


To reduce the number of jobs, we can reuse the same job from the same DAG. We assign the same Capability to different policies, and when the Capability resource is sufficient, the policies will be assigned to the same job.


Take the SQL Capability as an example. The SQL statements of each Policy are different. If you create a Job for each Policy, the Job Manager overhead is high and difficult to manage.


Therefore, we can configure 20 slots for SQL Capability, one for each Policy. Then the job generated by the Capability can run 20 policies.


When the job runs, the data read from Source is labeled with the corresponding Policy and the SQL statement defined by the Policy is executed. In this way, different policies can share the same job, greatly reducing the number of jobs.


There is another benefit to using shared jobs:If multiple namespaces are in a Single Kafka topic, the data can be read once, rather than read topic for each namespace and then filtered, which greatly improves processing efficiency.


Optimization and monitoring of Flink operations


Now that you understand metadata-driven, let’s take a look at some of the ways you can optimize and monitor Flink jobs.


Heartbeat


During the operation and maintenance of Flink cluster, it is difficult for us to monitor the operation of the job. Even when checkpoints are turned on, we can’t be sure if or how much data is lost. Therefore, we injected Heartbeat for each job to monitor its health.


Heartbeat works like a “LatencyMarker” in Flink to monitor delays and flows through pipes for each job.


Unlike LatencyMarker, however, when Heartbeat encounters a branch of the DAG, it splits and flows to each branch, rather than randomly flowing to a branch like LatencyMarker.


Another difference is that Heartbeat is not generated by Flink itself, but is generated periodically by metadata microservices and then consumed by each job.


As shown in Figure 4 above, each job starts with a Heartbeat data source by default.


After Heartbeat flows into each job, it follows the data flow through each node, labels each node with the current node, and then skips that node’s processing logic to the next node.


Until the Heartbeat flows to the last node, it is sent as a Metric to Sherlock.IO (eBay monitoring platform).


This metric includes the time the Heartbeat is generated, the time it flows into the job, and the time it arrives at each node.


Using this metric, we can determine whether the job is slow in reading Kafka, and how long it takes for a piece of data to be processed across the pipeline and for each node to process data, thereby determining the performance bottleneck of the job.


Because heartbeats are sent periodically, each job should receive the same number of heartbeats. If the final number of sent indicators is inconsistent with the expectation, it can further determine whether there is data loss.


Figure 6 depicts the data flow in a Flink job and the Heartbeat running state:

Figure 6: Heartbeat in action



availability


With Heartbeat, we can use it to define cluster availability. First, we need to define when it is not available:


(1) Flink restarts



When OutofMemory or code runs incorrectly, jobs can restart unexpectedly. We consider data loss during a restart to be one of the unavailability conditions. So one of our goals was to keep Flink jobs running steadily for a long time.


②Flink operation was aborted



Sometimes the physical machine or container does not start due to infrastructure problems, or fails to start due to insufficient slots when the Flink job restarts, or because the number of Flink job restarts has exceeded the maximum number of res.retry.max-attempts. Flink operation will be aborted.


Human intervention is required to restart the job. We consider Flink to be one of those cases where it is not available when the operation is aborted.


(4) Flink job no longer processes data during operation



This usually happens because of BackPressure. For example, the upstream traffic is too large, the processing capability of an intermediate operator is insufficient, or the downstream storage node encounters a performance bottleneck.


Although the back pressure in a short time will not cause data loss, it will affect the real-time performance of the data. The most obvious change is that the index of delay will become larger.


We consider backpressure to be one of the cases where it is not available. In all three cases, Heartbeat can be used to monitor and calculate availability.


So in the first case,If data is lost during a job restart, the Heartbeat of the corresponding section of the pipe is also lost, allowing us to monitor if data is missing and coarse-grained estimates of how much data is missing.


In the second case,When a job is aborted, HeartBeat is not processed, so you can quickly detect that the job is down and allow on-Call to intervene.


In the third case, when the back pressure occurs,The HeartBeat can also be blocked upstream of the backpressure, so on-Call can quickly detect the backpressure and intervene manually.


Together, Heartbeat can quickly monitor the health of Flink jobs. So how do you evaluate usability?


Since Heartbeat is timed, by default we set it to happen every 10 seconds. We expect six heartbeats with job information per pipe per minute per job, which is 8,640 heartbeats per day.


Thus, the availability of a job can be defined as:

Flink job isolation


Slot is the minimum unit in which Flink can run jobs [1], and each TaskManager can be allocated one or more slots (typically the number of cpus allocated for that TaskManager).


Depending on the parallelism of Flink jobs, a job can be assigned to multiple TaskManagers, and a TaskManager can run multiple jobs.


However, a TaskManager is a JVM, and when multiple jobs are assigned to a TaskManager, resource grabs can occur.


For example, I allocated 3 slots (3 cpus) and 8GB heap memory to a TaskManager.


When JobManager schedules a job, it is possible to schedule threads from three different jobs to the TaskManager, and all three jobs will simultaneously steal CPU and memory resources. When one job is particularly CPU or memory intensive, it affects the other two jobs.


In this case, we can isolate jobs by configuring Flink, as shown in Figure 7:

Figure 7:
Flink scheduling diagram before and after job isolation

Through the above configuration, each TaskManager can be restricted to exclusive CPU and memory resources, and no multiple jobs are preempted to achieve job isolation.


Back pressure


When we operated and maintained Flink cluster, we found that backpressure was the most common problem. As mentioned in 3.2, backpressure can occur for many reasons, but whatever the reason, the data will eventually be stored in the localBuffer of the operator upstream of the backpressure.


As we know, each TaskManager has a local buffer pool, and each operator that comes in populates the local buffer pool with data that is reclaimed from the operator.


After being backpressed, data cannot be sent out and the local buffer pool memory cannot be released, resulting in requestBuffer requests.


Because Heartbeat can only monitor if there is a backpressure, it cannot locate which operator is faulty.


So we periodically print out the StackTrace for each operator, and when backpressure occurs, we can use StackTrace to know which operator is the bottleneck.


As figure 8 shows, we can clearly see the Flink job where the backpressure occurred and the Taskmanager where it occurred. With Thread Dump, we can locate problems in the code.

Figure 8: StackTrace with backpressure


Other monitoring means


Flink itself provides a number of useful metrics [2] to monitor the performance of Flink jobs, to which we have added some business metrics. In addition, the following tools were used to monitor Flink jobs.


(1) the History server



Flink’s History Server [3] can query the status and indicators of completed jobs. For example, how many times a job restarts, how long it runs.


We often use it to find jobs that are not working properly. For example, we can use the Attempt indicator of the History Server to know how many times each job has restarted so that we can quickly go to the scene to find the cause of the restart and prevent it from happening again.


② Monitor jobs and clusters



Although Flink has HA mode, in extreme cases, such as when the whole cluster has problems, on-Call needs to detect and intervene immediately.


We keep metadata on the last submitted job success in the metadata microservice, which records which jobs should be run on each Flink cluster.


Daemon threads compare the metadata with the jobs running on Flink every minute and send an on-call Alert if the JobManager is disconnected or if any jobs are running inconsistently.


The instance


Here are some applications of Flink flow processing system that have been running on the monitoring system:


Event Alerting


The current monitoring team does Event alerting based on Flink Streaming. We define an alarm operator EventAlertingCapability, which can handle the rules defined by each Policy.


A performance monitoring rule is defined in Figure 9:

This rule indicates that when the application of the performance detector is R1Rover, the host starts with R1Rover, and the value is greater than 90, an alarm is generated. The generated alarms are sent to the specified Kafka topic for downstream processing.

Figure 9: Single-threshold1 Policy



Eventzon


Like eBay’s event hub, Eventzon collects events from various applications, frameworks, and infrastructures and generates real-time alerts via the monitoring team’s Flink Streaming.


Because the data sources of each event are different, their metadata is also different, so there is no unified rule to describe it.


We have defined a set of jobs to handle Eventzon events, including multiple capabilities, such as Filter Capability, for filtering illegal or unqualified events. Another example is Deduplicate Capability, which is used to remove duplicate events.


All Eventzon events, after a complete set of operations, generate a valid alarm and send it to the relevant team via E-mail, Slack, or Pagerduty according to the notification mechanism.


Netmon


Netmon stands for Network Monitoring and is used to monitor the health of the entire eBay Network. Its data source is the logs of eBay’s switches, routers and other network devices.


Netmon uses these logs to find out specific information, usually error logs, and generate alarms.


Each eBay device is “registered.” After each device sends its log, we use the EnrichCapability to query the information about the device in the “register,” and enrich the information, such as its IP address, data center, and rack, into the log information as an event.


When certain error logs are generated, they are matched by corresponding rules and generate alarms. The alarms are saved by EventProcess Capability to Elasticsearch and displayed on Netmon’s Dashboard in real time.


Sometimes network jitter causes some temporary errors, but the system will automatically recover after a while.


Netmon has a rule that identifies an alarm generated when network jitter occurs as Resolved.


For alarms that require manual intervention, you can manually click Cleared on the Netmon Dashboard to complete the life cycle of the alarm.


Summary and Prospect


EBay’s monitoring team wants to be able to alert users in real time based on metrics, events and logs provided by users, and corresponding alarm rules.


Flink Streaming can provide low latency processing to meet our low latency requirements, and it is suitable for more complex processing logic.


However, in the process of operation and maintenance of Flink, we also found that false alarms were not reported due to operation restart and other reasons, thus misleading customers. So we will be investing more in Flink’s stability and high availability in the future.


We also hope to integrate some complex AI algorithms in monitoring indicators and logs, so as to generate more effective and accurate alarms and become a powerful tool for operation and maintenance personnel.

Translated by Gu Xinyi

Reference: https://yq.aliyun.com/articles/714592?utm_content=g_1000072645