preface

Architecture transformation, embrace the cloud native service ecosystem

Current WeChat internal large data computing platform is based on the research of the Yard to build resource scheduling system, Yard designed in addition to providing online services resource isolation, on the other hand, in order to improve the online service of the machine the overall utilization rate of resources, the core strategy is to in your spare time machine can run on some big data offline tasks. However, the interconnection with various big data computing frameworks (such as Hadoop MapReduce, Spark, Flink, etc.) requires customized development. Iterative maintenance is very inflexible, and it is difficult to keep pace with the development of the open source community. To this end, we began to use Kubernetes and gradually build our big data computing platform based on TKE container platform of Tencent Cloud.

Considering that there are not many Flink operations on our Yard platform and the historical burden is relatively small, we first start the road of Flink on Kubernetes actual combat.

Wechat Flink real-time computing platform overview

Wechat Flink operation data flow diagram

The following figure shows the real-time calculation data flow diagram of Flink operations of most of our businesses. The data is collected and reported to the message queue Pulsar, and the users’ Flink operations consume Pulsar calculations (they also access other external storage, such as Redis and FeatureKV, if necessary). The calculation results can be stored in multiple storage systems. For example, for report services, the calculation results are written to mysql/ PG. For real-time sample feature splicing operations, the calculation results are written into HDFS to continuously provide samples for downstream model training. For some intermediate results, Pulsar is written for docking with downstream Flink jobs.

The following details how the Flink job in the figure above is submitted for deployment.

Cluster and Flink job deployment

Flink on TKE semi-hosted service, the ultimate Flink cloud native use experience

Flink on TKE semi-managed service provides Flink cluster deployment, logging, monitoring, storage, and other one-stop services. Users can run other online services with Flink in the same cluster to maximize resource utilization and achieve unified resources, technology stack, and o&M capabilities.

We build Flink Kubernetes computing cluster based on TKE container platform of Tencent Cloud. According to the existing Flink operations, we found that the vast majority of Flink operations mainly consume memory, and the CPU utilization is generally low. In terms of model selection, we recommend the memory machine.

Flink on Kubernetes has several deployment modes for submitting Flink jobs (please refer to the TKE team article for details: Flink on Kubernetes deployment mode analysis), The Flink open source community has launched the Standalone Kubernetes declaration-based deployment and Kubernetes Native deployment. Standalone Kubernetes declaration deployment is cumbersome and difficult to manage, so it is not considered. In addition, the community Flink on Kubernetes Native deployment has been officially launched since 1.12, and its functions are not yet complete and has not been verified in mass production. We actually started to investigate the deployment before this. After some comparison, we used Flink on TKE semi-managed service (based on Kubernetes Operator) provided by THE TKE container team. The submission deployment process is roughly as shown in the figure below.

With the Flink Operator, clients can submit deployment Flink jobs through a simple declarative API. The life cycle of each component is uniformly controlled by the Operator, for example:

apiVersion: flinkoperator.Kubernetes.io/v1beta1 kind: FlinkCluster metadata: name: flink-hello-world spec: image: name: Flink :1.11.3 jobManager: resources: limits: Memory: "replicas" CPU: "200m" taskManager: replicas: 2 resources: limits: memory: "2024Mi" cpu: "200m" job: jarFile: /opt/flink/examples/streaming/helloword.jar className: org.apache.flink.streaming.examples.wordcount.WordCount args: ["--input", "/opt/flink/README.txt"] parallelism: 2 flinkProperties: taskmanager.numberOfTaskSlots: "2"Copy the code

The Flink Operator submission process is roughly shown in the figure below, starting with a Flink Standalone Session Cluster, then pulling up a Job Pod to run the user code, A Job is submitted to the Standalone Session Cluster. After the Job is submitted, the Job running status is continuously tracked. Therefore, there are three types of Pods in operation, namely JobManager, TaskManager, and Job Pod.

Source: github.com/lyft/flinkK…

The advantages of using Flink Operator to deploy Flink jobs are self-evident. The client does not need KubeconFig like Flink on Kubernetes Native deployment and can directly access the API Server through the HTTP interface. Although Flink on Kubernetes Native deployment can automatically apply TM on demand, in fact, our application scenarios are basically single-job stream computing, and it is acceptable for users to plan resources in advance. Moreover, based on Flink Operator, we can do batch scheduling. The Gang Schedule prevents tasks from waiting for resources to be held by each other when resources are limited (e.g., large tasks are submitted first, some TaskManagers wait for resources for a long time, small tasks are submitted later, and small tasks are held there when they cannot apply for resources, etc.).

Automatic download User upload resources

Jobs are dynamically separated from the Flink kernel for increased flexibility

By submitting the deployment through the above declarative API, we can see that the user JAR package needs to be typed into the image in advance. As the platform provider, of course, it is impossible for every user to type the Docker image by themselves. Some users even do not know how to use Docker. Therefore, we should shield docker image from users, who only need to upload JAR packages and other resources. Flink Operator provides the initContainer option, with which we can automatically download resources uploaded by users. However, for simplicity, we directly modify the Startup script of Docker EntryPoint to download resources uploaded by users first, and then start Flink related processes. Resources uploaded by users are declared using environment variables. Such as:

apiVersion: flinkoperator.Kubernetes.io/v1beta1 kind: FlinkCluster metadata: name: flink-hello-world spec: image: name: Flink: 1.11.3 envVars: - name: FLINK_USER_JAR value: HDFS: / / XXX/path/to/helloword. The jar - name: FLINK_USER_DEPENDENCIES value: hdfs://xxx/path/to/config.json,hdfs://xxx/path/to/vocab.txt ...Copy the code

The dependency uploaded by the user can be any file. Different from Flink on Yarn, we do not distribute the dependency through Submit, but directly download it to the working directory in the container Docker EntryPoint startup script. If the dependency file is a JAR, you need to attach it to your classpath. In order not to modify flink’s script, We append the JAR to the environment variable HADOOP_CLASSPATH, which will eventually be added to the Java classpath when the Flink-related process starts.

The FLINK_USER_JAR jar of the user’s main class is simply downloaded in the Container of the Job Pod. If the jar is also downloaded to the current directory, it will also be attached to the classpath. This is because Java is loaded once at startup and Flink will load again when user main is executed. Therefore, we will download the main JAR package to a special fixed directory, such as /opt/workspace/main/, and submit it through spec.job.jarfile

Parameter specifies to/opt/workspace/main/XXX. The jar.

java.lang.LinkageError: loader constraint violation: loader (instance of sun/misc/Launcher$AppClassLoader) previously initiated loading for a different type with name "Pulsar org/apache / / client/API/Authentication" at Java lang. This. DefineClass1 (Native Method) ~ [? : 1.8.0 comes with _152] the at Java. Lang. This. DefineClass (763). This Java: ~ [? : 1.8.0 comes with _152] the at Java. Security. SecureClassLoader. DefineClass (SecureClassLoader. Java: 142) ~ [? : 1.8.0 comes with _152] the at Java.net.URLClassLoader.defineClass URLClassLoader. Java: (467) ~ [? : 1.8.0 comes with _152] the at Java.net.URLClassLoader.access$100 URLClassLoader. Java: (73) ~ [? : 1.8.0 comes with _152] the at Java.net.URLClassLoader$1.run URLClassLoader. Java: (368) ~ [? : 1.8.0 comes with _152] the at Java.net.URLClassLoader$1.run URLClassLoader. Java: (362) ~ [? : 1.8.0 comes with _152]Copy the code

In general, the startup process of each type of POD is as follows:

Get through with wechat background service

Resource types Demonsets in cloud native architecture simplify architecture transformation

Users’ Flink jobs often need to interact with wechat background service during operation. Accessing wechat background service on traditional bare computers requires machine deployment of Agent and routing configuration. For Kubernetes cluster, with the support of our colleagues in the infrastructure center, The basic Agent of wechat background was packaged and deployed on each node in DeamonSet mode. When we started flink-related Container, HostIPC option was put on and the routing configuration path was mounted, so we could access wechat background services as if using bare computers.

In addition, because some Unix sock files of agents are stored in/TMP, we need to mount directory/TMP in the container. However, shuffle, Web and some temporary files (such as decompressed so, etc.) are stored in/TMP by default during Flink operation. Djava.io. Tmpdir =/opt/workspace/ TMP Change Java’s default temporary directory to a path inside the container so that the job fails and container destruction does not leave garbage behind.

Property configuration, logging, and monitoring

Log and monitor, improve observability

As you can see from the declarative YAML configuration above, the Flink job is submitted by specifying the Flink property parameter with the flinkProperties option, In fact, the Flink Operator will deploy the flinkProperties parameter as ConfigMap, overwriting the Flink /conf directory in the image, so we cannot put the system default property configuration into the Flink Image. To this end, we maintain a Flink system default configuration on the client side, and when submitting, we will merge the user filled in the flinkProperties option, so that we can flexibly adjust the Flink system default configuration.

By default, Flink on Kubernetes deploys jobs that run processes in Docker Containers in the foreground, using the log4J-console. properties configuration, which logs directly to the console. This will result in the Flink UI not being able to display the log, only to view the Pod log. In addition, the log typed by the user through system.out.println will also be mixed with the log4j log, which will not be easily viewed. So we redefined log4J-console. properties to type log4j logs into a file in the FLINK_LOG_DIR directory and scroll by size. In order to see the output of user stdout on the Flink UI, In the process of startup command flink – console. Sh last plus 2 > &1 | tee ${FLINK_LOG_PREFIX}. Out, can put the console output log bypass a directory to the log file. The final log displayed by Flink UI is as follows:

We also deployed a Flink History Server on Kubernetes for historical failed jobs, which can be flexibly scaled up and down, so there is no need to worry about the automatic restart of the job in the middle of the night and can not trace the cause.

For resource and job monitoring, TKE provides the free cloud native Prometheus service TPS, which can be deployed and associated with our TKE cluster with one click. However, we did not use TPS since we deployed the monitoring platform using the mainstream Prometheus + Grafana combination in the early stage. Currently, we have monitoring of cluster resources, Namespace resources and job resources, as shown in the following figure. We will push the Flink Metric for each job to Prometheus later to monitor job level backpressure, GC, operator traffic, etc.

Data application platform docking

Based on the above basic flink-on-Kubernetes capabilities, Flink pairs can be connected to our various data application platforms. As shown in the figure below, we have enabled users to use Flink in a variety of ways. Users can drag and drop nodes on the machine learning platform or register custom nodes to use them as Jar packages or PyFlink, or write Flink SQL on the SQL analysis platform.

For Jar, PyFlink way of use will not expand in detail, for Flink SQL support, we are currently combined with our own metadata system, using Flink existing SQL functions. Currently, real-time data warehouse is widely mentioned in the industry. We know that traditional offline data warehouse, such as Hive, is no more than a layer of HDFS Schema, then real-time data warehouse is similar, the data source is usually Kafka, Pulsar and other message queue systems. A layer of Schema on top of that manages the real-time data, and it’s called a real-time data warehouse. We build Flink SQL capability based on the metadata management system of SQL analysis platform. Users can register/manage database table metadata on SQL analysis platform. For the sake of simple architecture, we do not implement our own Flink Catalog (metadata operation is directly completed on SQL analysis platform. Instead of implementing the CREATE, DROP, and other apis), the SQL is submitted using the process shown in the figure below.

The user registers the metadata of the database table on the SQL analysis platform (which can be controlled by fine authorization), and then edits the SQL submission. The SQL analysis platform will do the syntax check, permission check and validity check first, and then encrypts and packages the metadata involved in SQL together with the declarative configuration Yaml and submits it to the unified scheduling platform. On the unified scheduling platform, we developed a FlinkSQL type job, which is essentially a regular Flink Jar job, namely FlinkSQLDriver, which accepts SQL and its attached parameters. FlinkSQLDriver is submitted, parses the sent configuration. To assemble a complete SQL statements (including the DDL, DML), and then call tableEnvironment. The executeSql detailed execution, so temporary registration table is essentially the repository to the default catalog.

summary

This paper introduces wechat Flink-on-Kubernetes combat experience and the general situation of Flink data application platform. On the one hand, we provide the most basic Flink computing platform capability, with the help of Kubernetes effectively control cluster, On the other hand, we build real-time data storehouse on the existing data channel and metadata platform to provide Flink SQL capability and further reduce the threshold for users to use. The support for Flink SQL is still relatively preliminary and primitive, and we will explore more in-depth optimization based on the business usage.