Author: Wang Kang, senior development engineer of Vipshop data platform

Since 2017, Vipshop has been building a high-performance, stable, reliable and easy-to-use real-time computing platform based on Kubernetes in order to ensure the smooth operation of its internal business in normal times and during big promotion periods. The current platform supports mainstream frameworks such as Flink, Spark and Storm.

This paper will be divided into five aspects to share Vipshop Flink’s containerized practical application and productization experience:

  • The development overview
  • Flink Container Practice
  • Flink SQL platform construction
  • The application case
  • The future planning

I. Development overview

1. Cluster size

In terms of the scale of the cluster, we have 2000+ physical machines. We mainly deploy Kubernetes’ remote double-live cluster, and use Kubernetes’ Namespaces, labels and taints to realize business isolation and preliminary computing load isolation.

Flink Quest, Flink SQL Quest, Storm Quest, Spark Quest, these live online apps add up to more than 1000. At the moment we mainly support the Flink SQL part, because SQLization is a trend, so we need to support the online platform of SQL tasks.

2. Platform architecture

We analyze the overall architecture of the real-time computing platform from the bottom up:

  • Resource scheduling layer (bottom level)

In fact, it is running on Kubernetes in deployment mode, and although the platform supports yarn scheduling, it shares resources with batch tasks, so the mainstream tasks are still running on Kubernetes. Also, the YARN scheduling layer is primarily a set of YARN clusters that are deployed offline. In 2017, we developed a set of scheme of Flink on Kubernetes. Because the underlying scheduling is divided into two layers, when resources are tight, real-time and offline resources can be seconded.

  • Storage layer

It is mainly used to support the company’s internal Kafka-based real-time data VMS, Binlog-based VDP data and native Kafka as a message bus, the state is stored on HDFS. The data is mainly stored in Redis, MySQL, HBase, Kudu, HDFS, ClickHouse, etc.

  • Computing engine layer

The main ones are Flink, Storm, Spark, and currently Flink is the main one. Each framework will support several versions of the image to meet different business needs.

  • Real-time platform layer

It mainly provides job configuration, scheduling, version management, container monitoring, job monitoring, warning, logging and other functions, and provides multi-tenant resource management (quota, label management) and Kafka monitoring. Resource allocation is also divided into big promotion day and ordinary day. Big promotion resources are different from ordinary resources, and the rights control of resources is also different. Before version 1.11 of Flink, the self-built metadata management system of the platform was Flink SQL Management Schema. Starting with version 1.11, it was integrated with the company’s metadata management system through Hive Metastore.

  • The application layer

It mainly supports some scenes of real-time large screen, recommendation, experimental platform, real-time monitoring and real-time data cleaning.

Second, Flink Containerization Practice

1. Containerization scheme

Above is the architecture diagram of the real-time platform Flink containerized. Flink containerization is actually deployed in the Standalone mode.

There are three roles in our Deployment pattern: Client, Job Manager, and Task Manager, each of which will be controlled by a Deployment.

Users upload task JARs, configurations, etc. through the platform and store them on HDFS. At the same time, the configuration and dependencies maintained by the platform are also stored in HDFS. When POD starts, it will perform initialization operations such as pull.

The main process in the Client is an agent developed by GO. When the Client starts, it will first check the status of the cluster. When the cluster is ready, it will pull the JAR package from the HDFS, and then submit the task to the cluster. The main task of Client is to do fault tolerance, and its main function is to monitor the state of the task, do savepoint and other operations.

The metrics from the smart-agent acquisition container deployed on each physical machine are written into M3, and metrics are written into Prometheus through the Flink burst interface, combined with Grafana presentation. Also through the deployment on each physical machine VFileBeat collected mounted out of the relevant log written ES, Dragonfly can achieve log retrieval.

1) Flink platformization

In practice, it is important to combine specific scenarios and ease of use before considering doing platform-based work.

2) Flink stability

Exceptions are inevitable during the deployment and operation of our applications. At this time, the platform needs to make some strategies to ensure that tasks remain stable after abnormal conditions occur.

  • POD health and availability:

    The Livenessprobe and ReadInessprobe are used to detect and specify the restart policy of POD. Kubernetes itself can do a pull up of POD.

  • When the Flink task generates an exception:

    • Flink has its own restart strategy and failover mechanism as its first layer of assurance.
    • The Client will periodically monitor the state of Flink, update the latest checkpoint address to its cache, and report it to the platform, and then solidify it into MySQL. When Flink cannot restart again, the Client resubmits the task from the latest successful checkpoint. This is its second protection.

      After this layer solidifies Checkpoint into MySQL, the Flink HA mechanism is no longer used, eliminating the component dependency of ZK.

    • In the event that the current two layers fail to restart or there is an exception to the cluster, the platform automatically pulls up a new cluster from the latest checkpoint solidified into MySQL to submit tasks, which is its third layer of guarantee.
  • Disaster recovery in machine room:

    • The user’s JAR package, Checkpoint have done the remote double HDFS storage.
    • Double room and double cluster in different places.

2. Kafka monitoring scheme

Kafka monitoring is a very important link in task monitoring. The overall process is as follows:

The platform provides monitoring of Kafka accumulation. On the interface, users can configure their own Kafka monitoring, inform what cluster they are in, and configure information such as user consumption message. You can extract the user’s Kafka monitoring configuration from MySQL and then monitor Kafka through JMX. After such information is collected, it can be written into downstream Kafka, and then another Flink task can monitor alarms in real time. At the same time, these data can be written into CK synchronously. So we can give feedback to our users (we can use Prometheus for monitoring instead of CK, but CK is more suitable), and finally we can use Grafana component to show the user.

3. Flink SQL platform construction

With the previous Flink container solution, it is time to start the Flink SQL platform construction. As we all know, there is a certain cost to the development of streaming APIs. Flink is definitely faster, more stable and easier than Storm, but for some users, especially those who are Java developers, there is a barrier to entry.

The Flink Container implementation of Kubernetes facilitates the publishing of Flink API applications, but it is still not convenient for Flink SQL tasks. So the platform provides a more convenient online editing and publishing, SQL management and other stack development platform.

1. Flink SQL solution

The Flink SQL solution for the platform is shown in the figure above. The task publishing system is fully decoupled from the metadata management system.

1) Flink SQL task publishing platform

In practice, it is necessary to consider the ease of use and do platform-based work. The main operation interface is shown in the figure below:

  • Flink SQL version management, syntax verification, topology management, etc.
  • UDF general and task level management, support user-defined UDF;
  • Parameterized configuration interface is provided to facilitate users to launch tasks.

Here is an example of a user interface configuration:

Here is an example of a cluster configuration:

2) Metadata management

Before 1.11, the platform built its own metadata management system UDM, MySQL stored Kafka, Redis and other schemas, and through the custom catalog through Flink and UDM, so as to achieve metadata management.

After 1.11, Flink integration with Hive was improved. The platform reconfigured the Flink SQL framework and connected with offline metadata by deploying an SQL-Gateway Service and calling the SQL-Client JAR it maintains. It realizes the unification of real-time off-line metadata and lays a good foundation for the integration of stream and batch in the future.

Create metadata from Flink tables and persist them in Hive. When Flink SQL is started, it reads the table schema information from Hive.

2. Flink SQL related practices

The platform integrates and develops connectors that are officially supported or not. The mirror is decoupled from connectors, Formats and other related dependencies so that updates and iterations can be carried out quickly.

1) Flink SQL related practices

Flink SQL is divided into three main layers:

  • The connector layer

    • Support VDP Connector to read source data;
    • Support Sink & dimension table association with Redis String, Hash and other data types;
    • Support for Kudu Connector & Catalog & dimension table associations;
    • Support Protobuf format parsing real-time cleaning data;
    • Support VMS Connector to read source data;
    • Support ClickHouse Connector Sink distributed table & local high TPS write;
    • Hive Connector supports WaterMark Commit Policy partitioning Commit policies for multiple locations & complex data types such as ArRAY < String > and DECIMAL.
  • The runtime layer

    • The main support for topology execution plan modification;
    • Dimension table association keyBy optimizes cache performance and improves query performance;
    • Dimension table association delay join.
  • The platform layer

    • Hive UDF;
    • Support JSON HLL related handler functions;
    • Support Flink to run related parameter Settings such as minibatch and aggregation optimization parameters;
    • Flink upgrades Hadoop 3.

2) Modification of topology execution plan

To solve the problem that the parallelism of Stream Graph generated by SQL cannot be modified at present, the platform provides a modified topology preview to modify relevant parameters. The platform will provide users with the resolved Excution Plan JSON of FlinkSQL, ensure the uniqueness of the operators by using UID, modify the parallelism degree of each operator, chain strategy, etc., and also provide methods for users to solve the back pressure problem. For example, in the scenario of small concurrency of ClickHouse Sink for a large number of times, we support modifying the parallelism of ClickHouse Sink, source parallelism = 72, and sink parallelism = 24 to improve the TPS of ClickHouse Sink.

3) dimension table association keyBy cache optimization

For the situation of dimension table association, in order to reduce the number of IO requests, reduce the read pressure of dimension table database, thus reduce latency and improve throughput, there are three measures as follows:

Here is a graph of the dimension table associated with KeyBy for cache optimization:

Before the optimization, the dimension table associates the LookupJoin operator with the normal operator chain, and the dimension table associates the LookupJoin operator with the normal operator chain, using the Join key as the hash policy key.

After optimization in this way, for example, in the original 3000W data volume dimension table, there are 10 TM nodes, each node needs to cache 3000W data, and a total of 300,000,000 volumes need to be cached. After KeyBy optimization, each TM node only needs to cache 3000W/10 = 300W of data, and the total cached data is only 3000W, which greatly reduces the amount of cached data.

4) Dimension table association delay join

In dimension table association, there are many business scenarios. Before adding new data to dimension table data, join operation has already taken place on mainstream data, so the association cannot occur. Therefore, in order to ensure the correctness of the data, the data that cannot be associated is cached and a delayed join is performed.

The simplest approach is to set the retry number and retry interval in the associated function of the dimension table. This method increases the latency of the entire stream, but can solve the problem in cases where mainstream QPS is not high.

When the join dimension table is not associated, it will be cached first. Then, a delayed join can be performed according to the retry number and retry interval.

4. Application cases

1. Real-time data storehouse

1) Real-time data warehousing

Real-time data warehouse is mainly divided into three processes:

  • After real-time data cleaning in Level 1 Kafka, traffic data can be written to Level 2 Kafka, mainly in Protobuf format, and then written to Hive 5min table through Flink SQL for subsequent near-real-time ETL to speed up the preparation time of data sources in the ODS layer.
  • The MySQL business database data is parsed through VDP to form a BinLog CDC message flow, and then written into Hive 5min table through Flink SQL. At the same time, the data will be submitted to the custom partition, and the partition status will be reported to the service interface. Finally, an offline scheduling will be done.
  • The business system generates the business Kafka message flow through the VMS API, which is parsed by Flink SQL and written into the Hive 5min table. Message formats such as String, JSON, CSV can be supported.

Flink SQL is very convenient for streaming data wareaging, and version 1.12 already supports automatic merging of small files, addressing a very common pain point in the big data layer.

We customize the partition commit policy. When the current partition is ready, we will adjust the partition commit API of the real-time platform, and check whether the partition is ready through this API during offline scheduling.

After adopting Flink SQL unified warehousing scheme, we can obtain the following results:

  • First of all, we not only solve the problem of instability of the previous FLUME program, but also enable users to realize self-service warehousing, which greatly reduces the maintenance cost of warehousing task and ensures stability.
  • Secondly, we also improved the timeliness of offline warehouse, from the hour level to 5min granularity warehousing, which can enhance the timeliness.

2) Real-time index calculation

  • Real-time application consumes Kafka after cleaning, is associated through Redis dimension table, API, etc., and then incremental calculation of UV is made through Flink Window, and persistence is written to HBase.
  • After the real-time application consumes the VDP message flow, it is associated through the Redis dimension table, API, etc., and then through the Flink SQL calculation of sales and other related indicators, incremental upsert to Kudu, convenient according to the range partition batch query. Finally, it provides the final service for the real-time large screen through the data service.

In the past, Storm method was usually used for index calculation, which requires customized development through API. After adopting such Flink scheme, we can obtain the following results:

  • Cut the calculation logic to Flink SQL to reduce the fast change of calculation task diameter and solve the problem of slow modification on-line cycle.
  • Switching to Flink SQL allows for quick modifications and quick rollout, reducing maintenance costs.

3) Real-time offline integration of ETL data integration

The specific process is shown in the figure below:

Recent releases of Flink SQL have continued to enhance the ability to join dimension table data in real-time, not only in a database, but also in Hive and Kafka, allowing flexibility to meet different workload and timeliness requirements.

Based on Flink’s powerful streaming ETL capability, we can do data access and data conversion in the real-time layer, and then flow the fine layer data back into the offline database.

By introducing the HyperLogLog (HLL) implementation used internally by Presto into Spark UDAF functions, we are able to communicate the HLL objects between Spark SQL and the Presto engine. For example, Spark SQL generates HLL objects with prepare functions. You can merge queries in Spark SQL as well as in Presto.

The specific process is as follows:

UV approximate calculation example:

2. Experimental platform (FLINK real-time data into OLAP)

Vipshop experimental platform is an integrated platform for A/B-test experimental effect analysis by configuring multi-dimensional analysis and downhole analysis to provide massive data. An experiment consists of a stream of traffic (such as user requests) and modifications to a comparative experiment performed on that stream. The experimental platform has the demand of low delay, low response and super-large scale data (tens of billions) for massive data query.

The overall data structure is as follows:

  • Offline data is imported into ClickHouse via WaterDrop;
  • After the real-time data is processed by Flink SQL, such as data cleaning, parsing and expansion in Kafka, the product attributes are associated with the Redis dimension table, and written to ClickHouse through the distributed table. Then, the external interface is provided through the data service Ad Hoc query.

The business data flow is as follows:

There is a very important ES scene in our experiment platform. After we launch an application scene, if I want to see the effect, including the exposure, click, purchase and collection generated by the launch. We need to write the details of each data, such as some data of the stream, into CK according to the scene partition.

Through Flink SQL Redis Connector, we support Redis sink, source dimension table association and other operations. It is very convenient to read and write Redis and realize dimension table association. Cache can be configured in dimension table association, which greatly improves application TPS. Pipeline real-time data flow through Flink SQL, and finally sink the large wide table into CK, and store MurmurHash3_64 according to the granularity of a certain field to ensure that the data of the same user are stored in the same shard node group. In this way, the join between large CK tables becomes the join between local local tables, which reduces data shuffle operation and improves the efficiency of join query.

5. Future planning

1. Improve the usability of Flink SQL

Flink SQL is a little different for Hive users. Both Hive and Spark SQL are batch processing scenarios.

So there are still a lot of inconveniences with Flink SQL debugging, and some barriers to use for offline Hive users, such as manually configurable Kafka monitoring, and pressure test tuning for tasks. Therefore, it is a big challenge to reduce the threshold of users’ use to the lowest level, so that users only need to understand SQL or understand the business, shield the concept of Flink SQL from users, and simplify the use process of users.

In the future, we will consider doing some intelligent monitoring to inform the user of the problems of the current task, which does not require the user to learn too much, and can be automated as much as possible and give some optimization suggestions to the user.

2. The CDC analysis plan of Data Lake was implemented

On the one hand, we did the data lake mainly to address our real-time binlog update scenario. Currently our VDP binlog message flows are written to the Hive ODS layer via Flink SQL to speed up the preparation time of the ODS layer data sources, but generate a large number of duplicate messages to be recombined. We will consider the CDC warehousing option of Flink + Data Lake for incremental warehousing.

On the other hand, we want to replace our Kudu through the data lake, which is used by some of our important businesses. Although Kudu is not widely used, we started to investigate CDC+ Data Lake as a solution, given that Kudu operations are much more complex than normal database operations, relatively niche, and require very strong real-time upsert capabilities such as the Kafka message flow after the order is wide and the aggregated results. Replace the Kudu incremental upsert scenario with the incremental upsert capability of this scenario.

Q&A

Q1: Is the VDP Connector read by MySQL Binlog? Is Canal Canal a tool?

A1: VDP is a component of the company’s binlog synchronization, which is parsed and sent to Kafka. It is based on secondary development of Canal. We have defined a CDC format that can connect to the company’s VDP Kafka data source, which is somewhat similar to the Canal CDC format. Currently there is no open source solution that enables our company to use Binlog for a synchronization.

Q2: UV data is exported to HBase, sales data is exported to Kudu, and output to different data sources. What is the main reason for this strategy?

A2: Kudu is not used as widely as HBase. UV real-time writing TPS is relatively high, HBase is more suitable for single query scenario, write HBase high throughput + low delay, small range query delay is low; Kudu has some OLAP features, such as saving order class details, column saving acceleration, and OLAP analysis in combination with Spark, Presto, etc.

Q3: May I ask, how did you solve the data update problem of ClickHouse? For example, data indicators are updated.

A3: CK update is an asynchronous merge, which can only be performed on the same shard, the same node and the same partition. This is weak consistency. CK is not recommended for metric update scenarios. If you have an update scenario in CK, you can try the AggregatingMergetree solution, replacing the Update with INSERT, and doing the merge at the field level.

Q4: How to ensure data de-duplication and consistency for binlog writes?

A4: Binlog has not yet written to CK’s scenario, this scheme looks immature. This is not recommended and can be done using a CDC + Data Lake solution.

Q5: If CK nodes write unevenly, how to monitor and how to solve it? What about data skew?

A5: The data partition can be viewed by monitoring the amount and size of written data on each machine, each table and each partition through CK’s System.parts local surface, so as to locate a table, a machine and a partition.

Q6: How do you do mission monitoring or health checks on the real-time platform? And how does it automatically recover after an error? Are you in the yarn-application mode now? Is there a case for one yarn application for multiple Flink jobs?

A6 : For Flink 1.12+, the PrometheusReporter approach is supported to expose Flink Metrics, such as operator watermark, checkpoint-related metrics such as size, time spent, number of failures, and other key metrics. It is then collected and stored for task monitoring alerts.

Flink’s native Restart policy and failover mechanism serve as the first layer of assurance.

The Client will monitor the state of Flink regularly, update the latest checkpoint address to its cache, and report it to the platform, and solidify it into MySQL. When Flink cannot restart again, the Client resubmits the task from the latest successful checkpoint. As a second guarantee. After this layer solidifies Checkpoint into MySQL, the Flink HA mechanism is no longer used, eliminating the component dependency of ZK.

When the current two layers fail to restart or the cluster is abnormal, the platform automatically pulls a new cluster from the latest ChekCPoint solidified into MySQL and submits the task as the third layer guarantee.

We support the arn-per-job pattern, which is primarily based on the Flink on Kubernetes pattern for deploying standalone clusters.

Q7: Are all the components on your big data platform containerized or mixed at the moment?

A7: At present, our real-time component Flink, Spark, Storm, Presto and other computing frameworks have been containerized. For details, please refer to 1.2 Platform Architecture above.

Q8: Kudu doesn’t run on Kubernetes, does he?

A8: Kudu does not run on Kubernetes, for which there is no particularly mature solution. And Kudu is based on Cloudera Manager operation and maintenance, there is no need to go to Kubernetes.

Q9: Is it possible to save the dimension table of Flink real-time data storehouse into CK and then query CK?

A9: It can be done. It can be worth trying. The data from both the fact table and the dimension table can be saved and can be hashed on a field (for example, user_id) to create a Local Join.