Author: Shen Lei

Summary: Today’s main content is to share Flink in the excellent practice and application. Contents include:

  1. Container transformation and practice of FLINK
  2. Practice and application of Flink SQL
  3. Planning for the future.

I. Container transformation and practice of FLINK

1. Excellent cluster evolution history

  • The first Storm mission was launched in July 2014;
  • In 2016, Spark Streaming was introduced, running in Hadoop Yarn;
  • In 2018, Flink was introduced with Flink on Yarn Per Job;
  • In June 2020, 100% Flink JAR tasks K8S, K8S as the default computing resource of Flink JAR, Flink SQL tasks On YARN, Flink unified real-time development;
  • In November 2020, the Storm cluster officially went offline. [Fixed] The original Storm missions were all moved to Flink;
  • In 2021, we intend to K8S all Flink missions.

2. Business scenarios that Flink supports internally

Flink supports business scenarios such as risk control, real-time tasks for buried sites, payments, real-time feature processing for algorithms, real-time Kanban for BI, and real-time monitoring. The current real-time mission size is 500+.

3. There are pain points in Flink on Yarn

There are three main parts:

  • First, the CPU is not isolated. Flink On Yarn, in which CPUs are not isolated. When a real-time task causes a machine’s CPU usage to be too high, it can affect other real-time tasks in that machine.
  • Second, the cost of promoting expansion and expanding capacity is high. YARN and HDFS services use physical machines, which are not flexible enough to scale up during the big push and require a certain amount of manpower and resources.
  • Third, we need to invest in human operation and maintenance. The underlying application resources of the company are unified as K8S, and the operation and maintenance of YARN cluster alone will increase the manpower operation and maintenance costs of a type of cluster.

4. Flink on the advantages of K8S over Yarn

It can be summarized into four points:

  • First, unified operation and maintenance. The company unified operation and maintenance, there is a special department operation and maintenance;
  • Second, CPU isolation. K8S POD CPU isolation, real-time tasks do not affect each other, more stable;
  • Third, separation of storage calculations. Flink computing resources and state storage are separated, computing resources can be mixed with other component resources, improve machine utilization;
  • Fourth, elastic expansion volume. During the promotion period, it can expand and shrink flexibly, and better save manpower and material costs.

5. Deployment of real-time cluster

Generally, it is divided into three levels. The first layer is the storage layer; The second layer is the real time computing resource layer. The third layer is the real-time computing engine layer.

  • The storage layer is mainly divided into two parts:

    • The first is cloud disk, which stores the local state of the Flink task and the log of the Flink task.
    • The second part is the real-time computing HDFS cluster, which stores the remote state of Flink tasks.
  • The second layer is the resource layer of real-time computing, which is divided into two parts:

    • One is the Hadoop Yarn cluster;
    • Another cluster is Flink K8S cluster, and further subdivided, there will be Flink K8S and offline HDFS mixed cluster resources, as well as Flink K8S separate type of cluster resources.
  • At the top are some real-time Flink JARs, Spark Streaming tasks, and Flink SQL tasks.

The reason we considered mixing was that the offline HDFS cluster had low machine usage during the day. Offline HDFS cluster computing resources to real-time tasks, offline use of internal elastic computing resources of other components, so as to improve the utilization rate of the machine, better achieve cost reduction effect.

6. Flink on K8S containerization process

As shown in the figure below:

  1. Step 1: Flink Jar task submission of real-time platform, Flink Jar task version management, Docker Flink task image construction, upload image to Docker image warehouse;
  2. The second step is to start the task.
  3. The third step, YAML file creation;
  4. The fourth step is to conduct command interaction with K8S API Server.
  5. Step 5: Pull the Flink task image from the Docker image warehouse to the Flink K8S cluster;
  6. Finally, the task runs. Here are a few tips:

    • Operation mode is Flink Standalone Per Job mode;
    • Each Flink JAR task has an image, with the task name + time cut as the version of the image.
    • The JobManager needs to be created as a Deployment rather than a Job type;
    • Dockerfile specifies HADOOP_USER_NAME, the same as the online task.

7. Some practice at Flink on K8S

  • The first practice is to address the problem of under-resourced tasks failing to start.

    First, let’s describe the problem. Flink on K8S is not cloud native, so it cannot apply for real-time task resources on demand. When the resources configured by the user on the platform are less than the real resources used by the real-time task (for example, the user’s code writes to death and the concurrency configured by the user is less than this value), the real-time task will fail to start.

    To address this problem, we added an internal automatic detection mechanism for the concurrency of Flink JAR tasks. Its main flow is shown in the figure below. First of all, the user will submit the Flink JAR job on our platform. After the completion of the Flink JAR job submission, the user will put the JAR job and running parameters in the background to build the PackagedProgram. Get the pre-execution plan of the task through the PackagedProgram. Then it can get the true concurrency of the task. If the concurrency configured by the user in the code is smaller than the resources configured on the platform side, we will use the configuration on the platform side to apply for resources, and then launch; Instead, we use its real task concurrency to request resources and start tasks.

  • The second practice is a resource analysis tool for the Flink on K8S task.

    First of all, the background is that the Flink K8S task resources are self-configured by the user. When the concurrency or memory of the configuration is too large, there is a problem of waste of computing resources, which will increase the cost of the underlying machine. How to solve this problem, we made a platform administrator tool. For an administrator, there are two perspectives to see if the task’s resources have been overallocated:

    • The first is the task memory perspective. We used an open source tool, GC Viewer, to get a memory usage metric for this real-time task based on the task’s GC log.
    • The second is the message processing capability perspective. We added data source input Record /s and task message processing time Metric in Flink source layer. Find the task or operator with the slowest message processing based on the Metric to determine if the concurrency configuration is reasonable.

    The administrator presets the Flink resources according to the memory analysis index and the rationality of concurrency, combined with the optimization rules. Then we will communicate with the business side and adjust. The figure on the right shows the results of two analyses, and the results of Flink on K8S pod memory analysis are shown above. The following are the results of the Flink K8S task processing capability analysis. Finally, according to these indicators, we can make a resource readjustment to the task and reduce resource waste. We currently plan to make it an automated analysis and adjustment tool.

  • Next are other Flink on K8s related practices.

    • First, based on the use of Ingress Flink Web UI and REST API. Each task has an Ingress domain name, which is always used to access Flink Web UI and RESTI API;
    • Second, mount multiple hostpath volumes to solve the single cloud disk IO limit. The write bandwidth and IO capability of a single cloud disk block have bottlenecks, using multiple cloud disk blocks, reduce the pressure of cloud disk Checkpoint state and local write;
    • Third, Flink related general configuration ConfigMap, Flink image upload successful detection. Create ConfigMap for FileBeat and Flink jobs, then mount it to a real-time task to ensure that each Flink task image is successfully uploaded to the image repository.
    • Fourth, HDFS disk SSD and FileBeat based log collection. SSD disk is mainly in order to reduce the disk IO Wait time, adjust the DFS. Block. Invalidate. Limit, reduce the HDFS Pending delete block number. The task log is collected using FileBeat, output to Kafka, and viewed later with custom logservers and offline public logservers.

8. Flink on K8s’ current pain points

  • First, the JobManager HA problem. If the JobManager Pod fails, with the help of the K8S Deployment capability, the JobManager is restarted against the YAML file and the state may be lost. However, if YAML configures Savepoint recovery, the messages can be heavily duplicated. We hope to support the JobManager HA later with ZK or ETCD;
  • Second, modify the code, again upload time is long. Once the code changes the logic, Flink JAR task upload time plus mirroring time may be on the scale of minutes, which may have an impact on businesses with high real-time requirements. We hope that we can follow the community implementation and pull the task JAR from HDFS to run;
  • Third, K8S Node Down machine, JobManager recovery is slow. Once K8S Node is down, it will take about 8 minutes for JobManager POD to resume operation, which is mainly the time of finding internal anomalies in K8S and the time of starting jobs, which has an impact on some businesses, such as CPS real-time tasks. How to solve the problem? The platform side regularly detects the state of K8S node. Once the state of down machine is detected, stop the tasks on node which have the JobManager and then restore from its previous checkpoint.
  • Fourth, Flink on K8S is not cloud native. Currently, the problem of resource shortage and failure to start is solved with the Flink JAR task concurrency auto-detection tool. However, if the pre-execution plan of the task is not available, the concurrency of the code configuration cannot be obtained. Our thinking is: Flink on K8S cloud native functionality and the previous questions 1 and 2, if the community support is relatively fast, later may consider to align the Flink version with the community version.

9. Flink on some schemes recommended by K8S

  • The first option is for the platform to build and manage the image of the task itself.

    • The advantage is that the platform is in control of the overall process of building images and running real-time tasks, and problems can be fixed in a timely manner.
    • The disadvantages are: it is necessary to have a certain understanding of Docker and K8S related technologies, the threshold is relatively high, and non-cloud native related issues need to be considered at the same time. It is available in Flink 1.6 or higher.
  • The second solution, the Flink K8S Operator.

    • Advantage is: the user as a whole encapsulated a lot of low-level details, the use of the threshold is relatively lower.
    • The disadvantage is: the overall use is not as flexible as the first scheme, once there is a problem, because the underlying use is its encapsulated function, the underlying is not easy to modify. It is available in Flink version 1.7 and above.
  • The final solution is based on the Community Flink K8S feature.

    • Advantages: Cloud native, more friendly to resource application. At the same time, it will be easier for users to use, shielding many of the underlying implementations.
    • Disadvantages: K8S cloud native functions are still experimental functions, and related functions are still under development, such as K8S Per job mode. It is available for Flink 1.10 and up.

Second, Flink SQL practice and application

1. The development history of Flink SQL has been praised

  • In September 2019, we explored and experimented with capabilities in Flink 1.9, 1.10 SQL, and enhanced some Flink SQL features.
  • In October 2019, we carried out SQL function verification. Based on the real-time requirements of buried points, we verified the Flink SQL HBase dimension table association function, and the results met expectations.
  • In February 2020, we expanded the functionality of SQL, using Flink 1.10 as the SQL computing engine for Flink SQL function expansion development and optimization. The real-time platform supports full SQL-based development.
  • In April 2020, it began to support real-time data warehouse, Youzan education, beauty industry, retail and other relevant real-time demand.
  • In August 2020, the new version of the real-time platform will be officially launched, currently focusing on Flink SQL development of our real-time tasks.

2. Some practices in Flink SQL

It is mainly divided into three aspects:

  • First, Flink Connector practices include: Flink SQL supports Flink NSQ Connector, Flink SQL supports Flink HA HBase Sink and dimension tables, Flink SQL supports open MySQL Connector and Flink SQL Support standard output (already supported by the community), Flink SQL support ClickHouse Sink;
  • Second, practices at the platform level include: Flink SQL supports UDF and UDF management, supports task recovery from Checkpoint, supports idempotency functions, supports JSON related functions, and supports Flink to run related parameter configuration, such as state time setting, Aggregation optimization parameters, Flink real-time task consanguinity data automatic collection, Flink grammar correctness detection function;
  • Third, the practices of Flink Runtime include: the Flink source code adds a single Task and Operator a single record processing time indicator; Fixed BUG with Flink SQL retractable stream TOP N.

3. Business practices

  • The first practice is our internal real-time Kanban for customer service robots. The process is divided into three layers:

    • The first layer is the real-time data source. The first layer is the MySQL business table online. We will synchronize its Binlog to the corresponding Kafka Topic through the DTS service.
    • The ODS layer for real-time tasks has three Kafka Topics;
    • In the real-time DWD layer, there are two Flink SQL tasks.

      • Flink SQL A consumes two topics and associates data from each topic to the corresponding data via an Interval Join, depending on the role of some window. At the same time, the retention time of the state is set for this real-time task. After the Join, some ETL processing is done and its data is finally entered into a Topic C.
      • Another real-time task, Flink SQL B, will consume a topic, clean the data in the topic, and then go to HBase to associate a dimension table to associate some additional data it needs. The associated data will eventually be entered into Topic D.

    Upstream, Druid will consume the data for these two topics to run queries on metrics that are then made available to the business.

  • The second practice is the real-time user behavior middle tier. Users on our platform will search, browse, add to the shopping cart, and so on, will generate the corresponding events. The original solution was done offline. We will put the data into the Hive table, and then the students in the algorithm will combine the user characteristics, machine learning model, and offline data to generate some user rating estimates, and then input it into HBase.

    In this context, there are the following appeals: the current user rating is mainly based on offline tasks, while the algorithm students hope to improve the accuracy of recommendation in a more timely and accurate manner by combining real-time user characteristics. This involves building a real-time user behavior middle layer that inputs user-generated events into Kafka, processes the data through a Flink SQL job, and outputs the corresponding results to HBase. The algorithm students then combined with the algorithm model, updated some parameters in the model in real time, and finally made real-time evaluation of the user’s rating. They would also drop the database into HBase, and then use it online.

The construction process of the middle layer of user behavior is divided into three steps:

  • At the first level, our data source is in Kafka;
  • The second layer is the ODS layer, where there will be some flow table definition and some ETL logic processing in the Flink SQL job. Then define the relevant sink table, dimension table, and so on. There’s also going to be some aggregation, and then input to Kafka;
  • At the DWS layer, there is also a user’s Flink SQL job, which involves the user’s own UDF JAR, multi-stream JOINS, and the use of UDF. Then I read some data from the ODS layer, dump it into HBase, and finally give it to the algorithm team.

Here are a few practical lessons:

  • First, Kafka Topic, Flink task name, Flink SQL Table name, according to the storehouse naming convention.
  • Second, metrics aggregate class computation, Flink SQL task to set idle state retention time, to prevent the task state infinite increase.
  • Third, you need to configure the Flink SQL optimization parameters if there is data skew or read state pressure.

4. Practice at the HAHBase Connector

Community HBase Connector data associations or writes are used by a single HBase cluster. When the HBase cluster is unavailable, writes or writes to real-time task data are affected, potentially affecting business usage. As to how to solve this problem. First, there are two clusters on the HBase side, the primary cluster and the standby cluster. Master-slave replication is carried out between them through WAL. Flink SQL jobs are written to the primary cluster and automatically downgraded to the standby cluster when the primary cluster becomes unavailable, without affecting the use of the online business.

5. No secret MySQL Connector and metric extension practice

On the left is the Flink Closed MySQL Sink syntax, which solves three problems:

  • First, MySQL database user name and password are not exposed and stored in clear text;
  • Second, support MySQL user name and password periodic update;
  • Third, the internal authentication table permissions are used automatically based on the user name. The main purpose of this is to make the real-time task database more secure.

Then in the lower left figure, we added Task and Operator single message processing Metric at the Flink source level. The goal is to help businesses troubleshoot and optimize Flink real-time tasks based on message processing time monitoring metrics.

6. Practice of automatic collection of Flink task kinship metadata

The bloodline metadata collection process of Flink task is shown in the figure below. After the platform starts the real-time task, according to whether the current task is a Flink JAR task or a Flink SQL task, the platform takes two different paths to obtain the bloodline data of the task, and then reports the bloodline data to the metadata system. The value of this is twofold:

  • First, help the business side understand the real-time task processing link. The business party can more clearly understand the relationship and influence between real-time tasks, and can timely inform other downstream business parties when the task is operated.
  • Second, better construction of real-time data storehouse. Combined with the real-time task consanguinity diagram, the common layer of real-time data is extracted to improve the reusability and better construct the real-time data storehouse.

III. Future planning

Finally, the plan for the future, including four points:

  • First, the promotion of Flink real-time task SQL. Promote Flink SQL development real-time tasks and increase the proportion of Flink SQL tasks.
  • Second, Flink task computing resources are automatically optimized for allocation. From the memory, task processing capacity, input rate and so on, the task resources are analyzed, and the unreasonable task allocation is automatically configured, so as to reduce the machine cost.
  • Third, Flink SQL task K8S and K8S cloud native. The underlying computing resources of Flink are unified as K8S to reduce the cost of operation and maintenance. The cloud native of Flink K8S makes a more reasonable use of K8S resources.
  • Fourth, research on Flink, data lake and CDC functional technology. The research reserve of new technology lays the technical foundation for other real-time demand in the future.