This article is from Wang Xinchun’s share at Flink China offline Meetup in Shanghai on July 29, 2018. Wang Xinchun is currently in charge of the real-time platform related content in VIPshop, mainly including the real-time computing framework and providing real-time basic data, as well as the work of machine learning platform. Before meituan-Dianping, I was also responsible for the work of big data platform. He has accumulated rich experience in real-time processing of big data.

The main content of this paper mainly includes the following aspects:

  1. Vipshop real-time platform status
  2. Flink’s practice in VIPshop
  3. Flink On K8S
  4. Subsequent planning

I. Current situation of viPshop real-time platform

At present, the real-time platform of VIPshop is not a unified computing framework, but three major computing frameworks including Storm, Spark and Flink. Storm has the largest number of jobs historically, but since last year the focus has shifted to Flink, so this year the number of Flink applications has increased significantly.

The core business of real-time platform consists of eight parts: real-time recommendation, as the key business of e-commerce, contains multiple real-time features; Big Promotion Kanban, including various dimensions of statistical indicators (such as: various dimensions of orders, UV, conversion rate, funnel, etc.) for leadership, operations, product decisions; Real-time data cleaning, data collected from the user buried point, real-time cleaning and association, to provide better data for the downstream of each business; In addition, there are Internet finance, security risk control, price comparison with friends and other businesses, as well as Logview, Mercury, Titan as internal services monitoring system, VDRC real-time data synchronization system, etc.

The responsibilities of real-time platform mainly include real-time computing platform and real-time basic data. Based on the Storm, Spark, and Flink computing frameworks, the real-time computing platform ensures monitoring and stability, and provides data input and output for service development. The real-time basic data includes the definition and normalization of upstream buried points, and the cleaning and widening of user behavior data and MySQL Binlog logs to provide quality assurance data for the downstream.

In terms of architectural design, it includes two large data sources. One is buried data in App, wechat, H5 and other applications. The original data is collected and sent to Kafka. The other is the MySQL Binlog log of online real-time data. Data is cleaned and associated within the computing framework, bringing raw data through real-time ETL to make it easier to use for downstream business applications, including off-line width tables, etc.

Ii. Flink’s practice in VIPSHOP

Scene 1: Dataeye Real-time Kanban

Dataeye real-time Kanban supports real-time calculation of all buried point data, order data, etc. It has the characteristics of large data volume and requires many statistical dimensions, such as total station, secondary platform, department, schedule, crowd, activity, time dimension, etc., which improves the complexity of calculation. Statistical data output indicators can reach hundreds of thousands per second.

Take UV calculation as an example, first of all, the buried point data in Kafka cleaning, and then associated with Redis data, associated good data written in Kafka; Subsequent Flink computing tasks consume Kafka’s associated data. Generally, the amount of calculation results of tasks is very large (due to the number of calculation dimensions and indicators, which can reach tens of millions). Data output is also cached through Kafka. Finally, synchronization tasks are used to synchronize data to HBase for real-time data display. The synchronization task limits data flows written to HBase and merges indicators of the same type to protect HBase. At the same time, there is another way of computing solution as disaster recovery.

When calculating with Storm in the calculation engine, Redis needs to be used as the storage of the intermediate state. After switching to Flink, Flink has its own state storage, which saves the storage space. With no access to Redis, performance is improved and the overall resource consumption is reduced by a third.

During the process of gradually migrating computing tasks from Storm to Flink, the two solutions were migrated successively, and computing tasks and synchronization tasks were separated, which relieved the pressure of data writing to HBase.

There are also some issues that need to be tracked and improved after switching to Flink. For FlinkKafkaConsumer, due to business reasons in Kafka Aotu Commit modification, as well as the setting of offset, need to achieve their own support kafka cluster switching function. You need to manually clear the state data without Windows. There is also the problem of data skewing, a common problem in computing frameworks, that needs to be addressed. In the meantime, for the synchronization task tracing problem, Storm can take the value from Redis, Flink can only wait.

Scenario 2: Kafka data lands on the HDFS

Previously, Spark Streaming was used to implement it. Now, it is gradually switching to Flink. OrcBucketingTableSink is used to land the buried point data to the Hive table in HDFS. In Flink processing, single Task Write can reach about 3.5k /s. After using Flink, the resource consumption is reduced by 90%, and the delay is reduced from 30s to 3s. Flink support for Spark Bucket Table is currently being worked on.

Scenario 3: REAL-TIME ETL

One of the pain points for ETL processing is that dictionary tables are stored in HDFS and are constantly changing, and real-time data flows need to be joined to dictionary tables. Dictionary tables change is caused by the offline batch processing tasks, the current approach is to use ContinuousFileMonitoringFunction and ContinuousFileReaderOperator monitor HDFS data change regularly, keep brush into the new data, Use the latest data to do join real-time data.

We plan to implement a more general approach to support Hive table and stream join and automatically push data when Hive table data changes.

Three, Flink On K8S

In VIPshop, there are some different computing frameworks, such as real-time computing, machine learning and offline computing, so a unified underlying framework is needed for management, so Flink is migrated to K8S.

Cisco’s network components are used on K8S, and each Docker container has an independent IP, which is also visible to the outside world. The overall architecture of the real-time platform fusion is shown in the following figure.

Vipshop’s implementation scheme on K8S is quite different from that provided by the Flink community. Vipshop is deployed in K8S StatefulSet mode and implements some cluster-related interfaces. A job corresponds to a mini cluster, and HA is supported. For Flink, the biggest reason to use StatefulSet is that the HOSTNAME of the POD is ordered; The potential benefits include:

  1. If the hostname is -0 and -1, pod can be directly specified as jobManager. You can start one cluster with one Statefulset, while Deployment must have two; Jobmanager and TaskManager are independently deployed.

  2. If the POD fails for various reasons, cluster RECOVER can theoretically be faster than Deployment (which has random host names) because the hostname of the pod is not changed when StatefulSet repulls it.

The environment variables need to be set in the image Docker EntryPoint script.

Environment variable name parameter Example **** content instructions
JOB_MANGER_HOSTS StatefulSet.name-0,StatefulSet.name-1 flink-cluster-0,flink-cluster-1 JM host name, short host name; You don’t need an FQDN
FLINK_CLUSTER_IDENT namespace/StatefulSet.name default/flink-cluster This is the root directory for setting ZK HA and HDFS checkpiont
TASK_MANAGER_NUMBER_OF_TASK_SLOTS containers.resources.cpu.limits 2 The number of slots for TM, set according to resources.cpu.limits
FLINK_ZK_QUORUM env:FLINK_ZK_QUORUM 10.198.199.112:2181 HA ZK’s address
JOB_MANAGER_HEAP_MB env:JOB_MANAGER_HEAP_MBvalue:containers.resources.memory.limit -1024 4096 JM Heap size, due to out of memory, need less than container. The resources. The memory. The limits; Otherwise easy OOM kill
TASK_MANAGER_HEAP_MB env:TASK_MANAGER_HEAP_MB value: containers.resources.memory.limit -1024 4096 TM’s Heap size, due to the existence of Netty Heap memory, need less than container. Resources. The memory. The limits; Otherwise easy OOM kill

You can create ConfigMap to manage and maintain other configurations such as HDFS that the Flink cluster depends on.

kubectl create configmap hdfs-conf –from-file=hdfs-site.xml –from-file=core-site.xml

Fourth, follow-up plan

The current real-time systems, machine learning platform to deal with the data distribution in various kinds of data storage components, such as Kafka, Redis, Tair and HDFS, how convenient and efficient access, processing, sharing these data is a big challenge, for the current data access and parse often requires a lot of energy, the main pain points include:

  1. For binary (PB/Avro, etc.) data in Kafka, Redis and Tair, users cannot quickly and directly understand the schema and data content, and the cost of collecting data content and communicating with the writer is very high.
  2. Due to the lack of independent unified data system services, binary data access in Kafka, Redis, Tair, etc., needs to rely on the information provided by the writer, such as proto generation class, data format wiki definition, etc., maintenance cost is high, error prone.
  3. The lack of Relational Schema prevents users from developing their business directly on the more efficient and easy to use SQL or LINQ layer apis.
  4. Data cannot be easily published and shared through a single service.
  5. The real-time data cannot be directly used by the Batch SQL engine.
  6. In addition, most of the current data source access also lacks audit, permission management, access monitoring, tracking and other features.

UDM(Unified Data Management System) consists of Location Manager, Schema Metastore, and Client Proxy modules. The main functions of UDM include:

  1. Provides a name-to-address mapping service, with consumers accessing data through abstract names rather than concrete addresses.

  2. Users can easily view the data Schema and explore the data content through the Web GUI interface.

  3. Provides Client API Proxy that supports additional functions such as audit, monitoring, and traceability.

  4. In frameworks such as Spark/Flink/Storm, provide the encapsulation of these data sources in the form most suitable for use.

The following figure shows the overall architecture of the UDM.

Users of UDM include producers and users of data in real-time, machine learning, and offline platforms. When using Sql API or Table API, first complete the Schema registration, and then use Sql for development, reducing the amount of development code.

The UDM internal process is illustrated with the sequence diagram of Spark accessing Kafka PB data

In Flink, UDMExternalCatalog is used to get through the bridge between Flink computing framework and UDM. By implementing various interfaces of ExternalCatalog and TableSourceFactory of respective data sources, Complete various functions such as Schema and access control.