1 the introduction

As a PaaS service, netease cloud letter needs real-time monitoring of online businesses and real-time perception of health conditions such as “heartbeat”, “pulse” and “blood pressure” of the service. It is a very large and disordered data set to get the heart burie-point logs of SDK, server and other terminals through collection service. How can we make effective use of these data? What the service monitoring platform needs to do is to conduct real-time analysis on massive data, aggregate core indicators representing the “heartbeat”, “pulse” and “blood pressure” of the service, and visually display them to relevant students. The core capabilities are: real-time analytics and real-time aggregation.

In the previous article “Practice of netease Cloud Message Service Monitoring Platform”, we introduced the overall framework of netease Cloud Message Service Monitoring Platform around the four links of data collection, data processing, monitoring alarm and data application. This paper is a further elaboration of netease cloud message on the calculation logic of aggregation index.

Real-time aggregation based on detailed data sets to produce an aggregation metric is commonly implemented in the industry by Spark Streaming and Flink SQL/Stream API. Either way, we need to write code to specify the data source, the data cleansing logic, the aggregation dimension, the aggregation window size, the aggregation operator, etc. Such complex logic and code, whether it is development, testing, or maintenance of subsequent tasks, requires a large human/material cost. What we programmers need to do is to simplify and achieve great ingenuity.

In this article, we will explain how netease cloud message is based on Flink’s Stream API to implement a set of general aggregate metric calculation framework.

2 Overall structure

As shown in the figure above, it is the complete processing link of aggregation index developed by ourselves based on Flink, and the modules involved include:

  • Source: Load aggregation rules regularly, and create Kafka Consumer as needed according to the aggregation rules, and continue to consume data.
  • Process: including grouping logic, window logic, aggregation logic, sequential calculation logic, etc. As you can see from the figure, we split into two during the aggregation phase. What is the purpose of this? What’s the upside? Anyone who has ever done distributed and concurrent computing has encountered a common enemy: data skewness. The head client is more obvious in our PaaS service, so the skew is very serious, and the mystery of the two phases of aggregation will be explained in detail below.
  • Sink: Data output layer, currently output to Kafka and InfluxDB by default. The former is used to drive subsequent calculation (such as alarm notifications) and the latter is used for data presentation and query services.
  • Reporter: Full link statistics, such as input/output QPS, computation time, consumption stack, late data volume, etc.

The following sections will introduce the design and implementation ideas of these modules in detail.

3 source

Rule configuration

In order to facilitate the production and maintenance of aggregated indicators, we abstract and extract the key parameters involved in the calculation process of indicators, and provide a visual configuration page, as shown in the figure below. The following paragraphs will introduce the use of each parameter in the context of specific scenarios.

Rules of the load

As the aggregation task runs, we load the configuration periodically. If a new Topic is detected, we create a Kafka-Consumer thread to receive the upstream real-time data stream. Similarly, for a failed configuration, we will close the consuming thread and clean up the associated Reporter.

Data consumption

For the aggregation indicators with the same data source, we share a kafka-consumer. After pulling the records and parsing them, collect() is called for data distribution for each aggregation indicator. If the data filter rule (configuration item ⑤) of the index is not empty, data filtering is needed before data distribution, and the data that does not meet the conditions is directly discarded.

4 process

Overall calculation process

The core code to implement aggregate computing based on Flink’s Stream API is as follows:

SingleOutputStreamOperator<MetricContext> aggResult = src
        .assignTimestampsAndWatermarks(new MetricWatermark())
        .keyBy(new MetricKeyBy())
        .window(new MetricTimeWindow())
        .aggregate(new MetricAggFuction());
  • MetricWaterMark () : Gets the timestamp of the input data based on the specified time field (⑧) and drives the waterMark of the computation stream forward.
  • MetricKeyBy() : Specifies the aggregated dimension, similar to MySQL GroupBy. The value of the aggregated dimension is obtained from the data and spliced into the grouping key according to the grouping field (⑥).
  • MetricTimeWindow() : Configuration item ⑧ specifies the size of the aggregated calculation window. If the timing output is configured, we create a sliding window, otherwise we create a scrolling window.
  • MetricAggFuction() : Implement the calculation of various operators specified by configuration item ②. The realization principle of each operator will be introduced in detail in the following sections.

The second aggregation

For aggregation calculation with large data volume, data skew is a problem that has to be considered. Data skew means that there is a hot spot in the aggregation key specified by the grouping field configured in the rule (configuration item ⑥). Our computing framework was designed at the beginning to consider how to solve the data skewness problem, which is to split the aggregation process into two stages:

  • Stage 1: Shatter the data randomly for pre-aggregation.
  • Stage 2: Take the pre-aggregation results of Stage 1 as input for the final aggregation.

Specific implementation: If the parallelism parameter is greater than 1, generate a random number between [0, parallelism] as a randomKey. In the first stage of aggregation keyBy(), The key obtained according to the grouping field (configuration item ⑥) is splice with randomKey to generate the final aggregate Key, thus realizing the data random shattering.

Aggregation operator

As a platform-based product, we provide the following common aggregation operators. Because of the quadratic aggregation logic, each operator adopts the corresponding calculation strategy in the first and second stages.

operator Phase 1 aggregation Phase 2 aggregation
min/max/sum/count The input data is directly calculated for pre-aggregation, and the output results for pre-aggregation Perform secondary polymerization calculation on the pre-polymerization results of the first stage, and output the final result
first/last Compare the input data timestamp, record the minimum/maximum timestamp and the corresponding value value, and output <timestamp,value> data pair Calculate two times for <timestamp,value> data pair, and output the final first/last
avg Calculate the sum value of the group and the number of records, and output < SUM, CNT > data pair <sum > CNT > sum <sum > CNT > sum <sum > CNTCount
median/tp90/tp95 Statistics the distribution of input data, output the NumericHistogram Merge the input NumericHistogram, and finally output the median/TP90 / TP95
count-distinct A RoaringArray that outputs bucket information and bitmaps Merge the RoaringArray to output the exact de-count result
The count – distinct (approximate) Output cardinality count object HyperLogLog Merge operation on the HyperLoglog, the final output of the approximate de-count results

For operators whose results are affected by all the data, such as Count-Distinctive (deduplication), the conventional idea is to take advantage of the deduplication feature of Set, put all the statistics in a Set, and eventually print the Set Size in the getResult of the aggregate function. If the number of statistics is very large, the Set object will be very large, and I/O operations on the Set will take unacceptable time.

For MapReduce-like big data computing frameworks, performance bottlenecks tend to occur on the I/O of large objects in the Shuffle phase because the data needs to be serialized/transferred/deserialized, and Flink is no exception. Similar operators are Median and TP95.

For this reason, these operators need to be specially optimized. The idea of optimization is to minimize the size of data objects used in the calculation process, where:

  • Median/tp90 / tp95: Referred to the approximate algorithm of Hive Percentile_Approx, this algorithm records the data distribution by NumericHistogram (a kind of non-isometric histogram), and then obtains the corresponding TP value by interpolation (median is TP50).
  • COUNT-DISTINCT: Adopt RoaringBitmap algorithm, mark input samples by compressed bitmap, and finally get accurate result of de-repeat count.
  • COUNT-DISTINCT (Approximate) : Adopt Hyperloglog algorithm to get approximate de-count results by means of cardinality counting. This algorithm is suitable for the derepeat counting of large data sets.

post-processing

The post-processing module is to reprocess the output data of the second stage of aggregate calculation. It mainly has two functions:

  • Compound index calculation: the original statistical index is combined to get a new composite index. For example, to calculate the success rate of logins, we can calculate the denominator (the number of logins) and the numerator (the number of successful logins) respectively, and then divide the numerator by the denominator to get a new combined metric. The configuration item ③ is the calculation rule used to configure the composite metrics.
  • Relative index calculation: alarm rules often need to judge the relative change of a certain index (year-on-year/sequential). We can easily calculate the year-on-year/sequential index by using the state of Flink, and the configuration item (4) is used to configure the relative index rules.

Handling of abnormal data

There are two types of abnormal data: late data and early data.

  • Lateness data:

    • As for the seriously late data (allowedLateness larger than the aggregation window), it is collected by sideOutputLateData and reported by reporter statistics, so that visual monitoring can be carried out on the monitoring page.
    • For data that is slightly late (less than the allowedLateness of the aggregate window), a recalculation of the window is triggered. If every late data will trigger a recalculation of the Stage 1 window, the recalculation results will be transmitted to the Aggregate Calculation of the Stage 2, which will lead to repeated statistics of some data. In order to solve the problem of repeated statistics, we carried out special processing in the first stage of the aggregate Trigger: the window is triggered by FIRE_AND_PURGE (calculate and clean up), and the data that has been involved in the calculation is cleaned up in time.
  • Advance data: This data is often caused by inaccurate clocks at the data submission end. Timestamp of these data should be calculated with human intervention to avoid affecting the watermark of the entire computation stream.

5 sink

The aggregated metrics are output by default to the Kafka and Sequence databases for InfluxDB.

  • Kafka-sink: The indicator identifier (configuration item ①) is taken as the topic of Kafka, and the aggregation results are sent out. After the downstream receives the data stream, it can be further processed, such as the production of alarm events, etc.
  • InfluxDB-Sink: Use the index identifier (configuration item ①) as the table name of the sequential database, persist the aggregate results, and use it for API data query and visual report display.

6 reporter

In order to monitor the operation of all data sources and aggregation indexes in real time, we use the combination of InfluxDB+Grafana to realize the whole-link monitoring of aggregation computing: such as input/output QPS of each link, calculation time, consumption accumulation, and overdue data amount, etc.

7 conclusion

At present, through this general aggregation framework, netease Yunxin has carried 100+ index calculations of different dimensions, which has brought considerable benefits:

  • Benefit: the page configuration method is adopted to realize the production of aggregation indicators, and the development cycle is shortened from day level to minute level. Students without data development experience can also complete the index configuration by themselves.
  • Simple maintenance and high resource utilization: only one Flink-Job is needed for 100+ indicators, and resource consumption is reduced from 300+ CU to 40CU.
  • Transparent operation process: With full link monitoring, it can be clear which calculation link has bottleneck and which data source has problem.

The authors introduce

Shengshaoyou, senior development engineer of netease Yunxin data platform, engaged in relevant work of data platform, responsible for the design and development of service monitoring platform, data application platform and quality service platform.

More technical dry goods, welcome to pay attention to “netease intelligent enterprise technology +” WeChat public number