This article is compiled from the topic “Flink application and Practice in 58.com” shared by Feng Haitao, head of 58.com real-time computing platform, at Flink Forward Asia 2020. The content includes:

  1. Real-time computing platform architecture
  2. Real-time SQL construction
  3. Storm migrates Flink practices
  4. One-stop real-time computing platform
  5. Subsequent planning

I. Real-time computing platform architecture

The positioning of real-time computing platform is to provide efficient and stable real-time computing one-stop service for 58 Group’s massive data. One-stop service is divided into three main directions:

  • The first direction is real-time data storage, which is mainly responsible for providing high-speed real-time storage capability for online service access.
  • The second is real-time data computing, which mainly provides distributed computing framework for massive data processing.
  • The third is real-time data distribution, which is mainly responsible for distributing the calculated data to the subsequent real-time storage for upper-layer applications.

The platform construction is mainly divided into two parts:

  • The first part is basic capacity construction, which currently mainly includes Kafka cluster, Storm cluster, Flink cluster and SparkStreaming cluster.

  • The other part is platform construction, mainly including two points:

    • The first is data distribution, which is a platform built on Kafka Connect to integrate and distribute heterogeneous data sources. In the actual use of data scenarios, it is often necessary to gather different data sources together for computational analysis.

      The traditional method may require different data synchronization schemes for different storage devices. Our data distribution is to achieve the integration and distribution of different data sources by providing a complete set of architecture.

    • The second is the one-stop real-time computing platform we built based on Flink, which will be introduced in detail later.

The diagram above shows the architecture of our real-time computing platform.

  • In the part of real-time data access, Kafka is adopted, and Binlog provides canal and Debezium access in two ways.

  • In the part of service logs, we mainly use Flume to collect online business logs.

  • In the part of real-time computing engine, according to the development of open source community and the needs of users, from the earliest Storm to the later introduction of SparkStreaming, and now the mainstream Flink.

  • For real-time storage, we support Kafka, Druid, Hbase, ES, and ClickHouse to meet multiple real-time requirements.

  • At the same time, on the basis of the computing architecture, we have built some management platforms, such as cluster management, which is mainly responsible for cluster expansion and stability management.

  • Another is Nightfury, which is responsible for cluster governance, including data access, permission governance, resource management, and more.

We introduced the Flink computing framework in the process of business development. First of all, in terms of business, 58 is a one-stop life service platform, including many business lines. With the development of services, the amount of data is increasing, and the scenarios are becoming richer and richer. Therefore, a more powerful computing framework is needed to meet the needs of users.

  • The first scenario is real-time ETL, which is mainly used for information transformation and structured processing of original logs for subsequent computation, requiring computing capacity with high throughput and low latency.

  • The second block is real-time data warehouse, as a complement to offline data warehouse, mainly to improve the timeliness of some real-time indicators. The third scenario is real-time monitoring, which requires a flexible time window.

  • The final scenario is real-time data flow analysis, for example, data out-of-order processing, intermediate state management, and Exactly once semantics assurance.

The computing clusters we built based on Storm and SparkStreaming were largely inadequate for these scenarios. Therefore, an investigation was conducted on Flink, and it was found that Flink showed great advantages in both computing performance and stream data feature support. Therefore, we decided to adopt Flink as the mainstream computing framework.

Above is the construction of our Flink cluster. As a real-time computing framework, Flink often requires 7×24 hour availability. We need to consider a highly available architecture when building the underlying cluster.

  • In the deployment mode, Flink On YARN is used to implement high availability of clusters.

  • On the underlying HDFS, the HDFS federation mechanism can avoid the impact of offline cluster jitter on the real-time side and reduce the number of HDFS to be maintained.

  • In cluster isolation, the Node Labe mechanism is mainly used to realize the important services running on some designated nodes. On this basis, Cgroup is introduced to isolate CPUS and avoid CPU preemption between tasks.

  • At the management level, different services are submitted to different queues for management to avoid resource preemption between services.

  • In computing scenarios, data is submitted to different nodes based on different computing scenarios, such as computing and I/O, to improve resource utilization of the entire cluster.

The Flink computing framework has been under development for about two years in 58. At present, our cluster has more than 900 machines and more than 2,000 real-time tasks, processing approximately 2.5 trillion real-time data per day, with data volume peaking at 30 million per second.

Second, real-time SQL construction

1. Real-time SQL evolution

SQL programming has the characteristics of low threshold, automatic optimization and unified version. At the same time, as the main tool of real-time data warehouse, Flink SQL is the main direction we consider in the construction of Flink platform.

Our first release, Flink, was based on version 1.6, which only supported DML at the time, and we added some extensions, mainly to DDL syntax support. In order to simplify the definition of DDL at the user level, a configuration approach is also adopted to automatically generate DDL. At the time of development, provide visual development function and online debugging function.

With the opening of the community, we switched to the community version of Flink SQL, and later upgraded the related version, as well as incorporating more community version features such as Blink related, batch streaming integration, and Hive support.

Finally, we also do some data warehouse for the real-time data warehouse of Flink SQL, including metadata management, blood relationship, data warehouse hierarchy, authority management and so on.

2. Storage expansion

In terms of storage extensions, we started with a DDL that Flink implemented itself. With the community open source, I switched to the Flink SQL version of the community and then made some extensions on it, mainly in several areas:

  • First, it gets through mainstream storage and internal real-time storage. For example, internal WMB is supported on the source table, which is a distributed message queue. Support this redis, internal Wtable, on dimension tables. ClickHouse, Redis, and our internal WTable are supported on the results table;
  • Second, customize format support. Because in real business, many data formats are not standard, you can’t define a table through DDL. We provide a generic way to take a field to represent a log, allowing users to customize and parse a log through udF.
  • Finally, based on the definition of source and sink DDL, the setting of concurrency is added. This gives users more flexibility to control the concurrency of tasks.

3. Performance optimization

There are two main aspects of performance optimization:

  • The first is the introduction of Blink feature, which provides a large number of features, such as mini Batch processing to improve the throughput of tasks. Local Global two-phase aggregation is used to alleviate data hotspots. There is also the ability to enhance Windows through EMIT. By integrating these functions into our computing platform, users can open them directly with a few buttons.

  • The other is the application of asynchronous lO. In the process of real-time data warehouse construction, the association between dimension tables is a relatively large application scenario, often because of the performance of the dimension table, the throughput of the whole task is not high. Therefore, we added an asynchronous IO mechanism, which has two main implementations:

    • One supports asynchronous client for the target storage, which is directly implemented based on asynchronous client. Like MySQL and Redis.

    • Another does not support asynchronous client, we use the existing mechanism to simulate, at the same time on this basis added a set of caching mechanism, avoid all data directly query to the target storage, reduce the target storage pressure. At the same time, on the basis of cache, also add LRU mechanism, more flexible control of the whole cache.

      Similarly, when data writing encounters a large amount of concurrent writing, try to improve the concurrency to solve the writability problem, which will lead to low CPU utilization of the whole task. Therefore, the single-concurrency multi-threaded writing mechanism is adopted, which is implemented by adding a buffer into the sink operator. After the data is poured into sink, it will first be written into buffer, and then the multithreading mechanism will be started to consume the buffer, and finally written into storage.

4. Warehouse construction

As a typical application scenario of Flink, real-time data warehouse may have some imperfect platformization compared with offline data warehouse:

  • First, the metadata management function is not perfect.

  • Then, with Flink SQL, we may need to redefine a table for each task. And because there is no hierarchical concept of data, it leads to relatively independent tasks, smokestack development, and low data and resource utilization.

  • In addition, there is a lack of data pedigree information.

In order to improve the efficiency of real-time data warehouse construction, we provide real-time SQL capability oriented to data warehouse, and fully align the offline data warehouse construction mode in data warehouse design, task development, and platform management.

4.1 number of warehouse

Data warehouse mainly refers to the offline data warehouse model, we real-time data warehouse this piece of model construction.

For example, the most original data will enter the ODS layer, after some cleaning into the behavior detail layer, and then split to the specific topic detail layer, and then some related dimension table information is calculated, and then to the summary layer, and finally provided to the highest level of applications, including some real-time reports, ad-hoc queries, etc.

4.2 Warehouse counting platform

At present, real-time data warehouse is mainly based on the Lambda architecture to carry out the construction of platform.

  • First of all, in terms of metadata management, Flink adopts memory to manage metadata by default, so we adopted HiveCatalog mechanism to persist library tables.

  • At the same time, we use Hive ACL to manage the permission of the database.

  • With metadata persistence, you can provide global metadata retrieval.

  • Meanwhile, the task mode can be simplified from traditional DDL+DML to DML.

  • Finally, we also did the kinship, mainly in the Flink SQL submission process, automatic discovery of SQL task kinship dependencies.

Storm migrates Flink practice

1. Flink vs. Storm

Flink has many advantages over Storm.

  • In terms of data guarantee, Flink supports Exactly once semantics. In terms of throughput, resource management and state management, more and more users are developing based on Flink.

  • For Storm users, the programming model is simple, the development cost is high, the streaming computing feature is lacking, and the throughput is too low to meet the performance requirements. On the platform side, there are many independent clusters, difficult operation and maintenance, lack of platform-based task management, and poor user experience.

So we decided to move to Flink.

2. The Flink – Storm tools

When Storm migrates to Flink, it may take a lot of work for users to develop logic based on Flink again. So we did some research on Flink and found a Flink-Storm tool. It implements the transfer of Storm Topology to Flink Topology. For example, convert spout to Flink’s Source function and Bolt to Transform and sink Function.

We also found some problems in the process of using the tool. Flink-storm could not support Yarn mode and lacked the function of Storm engine. Finally, there was a big problem. However, the Flink-Storm tool only supports development based on one version. So we made some improvements.

3. Improvements to Flink-Storm

3.1 Message Guarantee

Storm has three features:

  • First, ack mechanism;
  • Second, rely on ZooKeeper.
  • Thirdly, at least once semantic guarantee.

We made four improvements:

  • First, flink-storm removed ack support;
  • Second, KafkaSpout implements CheckpointListener;
  • Third, KafkaSpout implements CheckpointedFunction;
  • Fourth, Flink-Storm opens Checkpoint.

3.2 Support for Storm timers

In earlier versions there was no windowing mechanism, we used the Storm timing mechanism for windowing calculations. Here’s how it works: The Storm engine periodically sends a system signal into Bolt that allows the user to perform a shard to simulate window operation.

Similarly, Flink does not have such a timer mechanism, so we considered to implement it from the flink-Storm level, and modified BoltWrapper class. BoltWrapper is an encapsulation of Bolt class, and its implementation mechanism is the same as Bolt, including 5 points:

  • Initialize the asynchronous thread in Open mode.
  • Simulate the StreamRecord of constructing tick;
  • Call the processeElement function to send a tuple;
  • The frequency is globally controlled by external parameters;
  • Close closes the thread.

3.3 Storm on Yarn

Storm on Yarn does not commit directly to the YARN cluster, it only commits to local or stand alone mode. Flink on YARN provides ClusterClient, which can be implemented in three steps:

  1. Initialize YarnClusterConfiguration Flink configuration perform JAR package/resource configuration load classpath;

  2. Start yarn Client.

  3. Reuse Flink on YARN to deploy the transformed jobGraph.

4. Task migration

With some of the above improvements in place, the migration is relatively easy. The first thing we do is package up the modified version and upload it to the company’s private server. The user then only needs to import the JAR package in his project. On the code side, you just need to change the storm-based commit to the Flink-based commit and the logic doesn’t change at all. In the task deployment pattern section, Flink commit pattern is also provided, such a script can implement the Flink Perjob pattern.

In summary, with the exception of extreme complexity, all tasks are migrated seamlessly. After migrating to Flink, the latency of most tasks was reduced to the millisecond level, and the overall throughput was increased three to five times. At the same time, the overall resource savings is approximately 40%, equivalent to about 80 machines. 5 Storm clusters were completely taken offline, and the task platform-based management was implemented.

Iv. One-stop real-time computing platform

1. Wstream platform

In order to improve management efficiency, we built the Wstream platform, which is built between the bottom engine and the top application, and can shield the bottom cluster information for users, such as some information across multiple clusters in the machine room.

  • In terms of task access, Flink Jar, Flink SQL, Flink-Storm and PyFlink are supported to meet diversified user needs.

  • In terms of product functions, it mainly supports task management, task creation, start deletion and so on.

  • In addition, we provide an alarm monitoring and task diagnosis system to better enable users to manage their own tasks and locate problems in tasks.

  • For the data warehouse, it provides some data warehouse platform functions, including rights management, blood relationship and so on.

  • Debugging probes are also provided for Flink SQL.

Users can build their applications on top of the Wstream platform.

2. Status management

As an important feature of Flink, state is widely used in practical scenarios. Users couldn’t interact with the underlying Flink tool when using the platform, so we integrated some of the underlying capabilities.

  • To save tasks, the system supports Checkpoint, Savepoint, and Cancel With Savepoint.

  • In terms of fault tolerance, allowNonRestoredState is supported to skip unrecoverable states.

  • In terms of analysis, Queryable State can be queried in real time, and the status can be downloaded and analyzed by offline State Processor.

For the whole task status management, we set the direction to the specified Hdfs directory through JobGraph for unified directory management. Control concurrency, JobGraph optimization, checkpoint interval, and number of reserved versions in the state small file.

SQL debugging. 3

We also provide some debugging capabilities for Flink SQL. There are two main parts:

  • First, syntactic functions include:

    • Intelligent prompt;
    • Syntax verification;
    • Convert graph logic validation.
  • Second, functions at the logical level include:

    • Analog input, DataGen custom data source;
    • The result output, Print redirects to standard output.

In this way, we can debug the whole business logic more easily.

4. Task monitoring

With regard to task monitoring, for Flink real-time computing tasks, our main concerns were task stability, performance aspects, and whether the business logic met expectations. How to monitor these indicators mainly includes four aspects:

  • The first is Flink metrics, which provides a lot of information, such as flow information, status information, backvoltage, checkpoints, CPU, network, etc.

  • The second is the YARN level, which provides the running duration and task status.

  • Third, information accumulation is provided from the Kafka level.

  • Finally, with some user-defined metrics, we can see if the business logic is meeting expectations.

5. Monitoring system

In order to collect these indicators, we also built a monitoring system based on Prometheus. For all Flink tasks, metrics will be pushed to PushGateway in real time, and metrics collected will be pushed to Prometheus, where we mainly use federation. All sub-nodes collect indicators, and then converge to a central node to provide external services in a unified manner. Finally, the whole indicator can be calculated and the alarm can be generated.

6. Monitor alarms

With the above indicators, we can be more convenient in the area of alarm. In terms of task stability, which is concerned by real-time computing, we can observe the operation of the whole task from the information of Topic message consumption accumulation, QPS fluctuation of task calculation, Flink task Restart, Flink Checkpoint failed, task failure, delay and so on.

7. Visualization of indicators

In the aspect of index visualization, there are mainly two aspects:

  • The first level is the Job level, which mainly gathers some core indicators into our real-time computing platform. For example, QPS information, input and output information, delay information and so on;

  • For task-level metrics, Grafana can be used to know specific task information, such as traffic information and back-pressure information.

5. Follow-up planning

Our follow-up planning mainly includes four aspects:

  • The first is the popular batch stream syndication in the community. Since most of our current real-time architecture is based on the Lambda architecture, which requires a lot of maintenance, we also want to simplify the architecture with the ability to integrate batch streams.
  • The second is resource tuning, because streaming computing lacks some mechanism for dynamic resource management, so we want to have the means to do some of this tuning.
  • The third is intelligent monitoring, where our current monitoring and alarms are ex post facto, and we want some way to alert tasks before they go wrong.
  • Finally, embrace the new capabilities of the community, including exploring new scenarios.