Background of platform construction

The timeliness of traditional offline data development is poor and can not meet the needs of rapid iteration of the Internet. With the rapid development of real-time technology represented by Flink, real-time computing has been used by more and more enterprises. However, various problems also follow during the use. For example, developers have high barriers to use, the quality of the produced business data is not guaranteed, and the lack of unified platform management is difficult to maintain. Under the influence of many adverse factors, we decided to use the existing Flink technology to build a complete set of real-time computing platform.

Overall Platform Architecture

In terms of the overall architecture, the real-time computing platform can be roughly divided into three layers:

  • Computing platform

  • Scheduling platform

  • Resource platform.

Each layer assumes corresponding functions, and at the same time, there is interaction between layers, which conforms to the design atom of high cohesion and low coupling. The architecture diagram is as follows:

Computing platform

It is directly used by developers and can access various external data sources according to business requirements to provide follow-up tasks. After the data source configuration is complete, you can do the work of data synchronization based on Flink framework, sqL-based data calculation, and multi-dimensional monitoring and alarm of running tasks.

Scheduling platform

After this layer receives the task content and configuration from the platform, the next is the core work, which is also the focus of the following content. Here’s a general overview of how different plug-ins will be used for parsing depending on the type of task.

  • Data synchronization task: After receiving the JSON from the upper layer, it enters the FlinkX framework, generates the corresponding DataStream according to the different data source side and write target side, and finally converts it into JobGraph.

  • Data calculation task: After receiving the SQL from the upper layer, enter the FlinkStreamSQL framework, parse the SQL, register it as a table, generate transformation, and finally transform it into JobGraph.

The scheduling platform will submit the obtained JobGraph to the corresponding resource platform to complete the submission of the task.

Resource platform

Currently, multiple resource clusters and different resource types, such as YARN and K8S, can be connected.

Data synchronization and data calculation

In the scheduling platform, receiving a user’s task begins a series of transformations that eventually get the task running. We look at the underlying technical details of how to build a real-time computing platform based on Flink, and how to use FlinkX and FlinkStreamSQL for one-stop development.

FlinkX

As the first and most basic step of data processing, let’s take a look at how FlinkX does secondary development on the basis of Flink. The user only needs to pay attention to the JSON script for the synchronization task and some configuration, not the details of calling Flink, and FlinkX supports the functions shown in the following figure.

Let’s first take a look at the process involved in Flink task submission. The interaction flowchart is as follows:

How does FlinkX encapsulate and invoke the above components on the basis of Flink, making it easier to use Flink as a data synchronization tool?

Mainly from the Client, JobManager, TaskManager three parts of the extension, the content involved is as follows:

The Client side

FlinkX has made some customized development of the native Client. Under the FlinkX-Launcher module, there are mainly the following steps:

  1. Parse parameters, such as: parallelism, savePoint path, program entry JAR package (usually written Flink demo), flink-conf. yml configuration;

  2. PackagedProgram is generated through the program’s entry JAR package, external parameters passed in, and savePoint parameters.

  3. Use reflection to call the main method of the jar package specified in the PackagedProgram. In the main method, load the corresponding plug-in based on the difference between reader and writer configured by the user.

  4. Create JobGraph, add resources (Flink jar packages, Reader and writer JAR packages, Flink configuration files, etc.) to the YarnClusterDescriptor shipFiles. Finally, the YarnClusterDescriptor can interact with yarn to start the JobManager.

  5. After the task is successfully submitted, the Client obtains the applicationId returned by YARN. Then, you can use application to track the task status.

The JobManager end

After the Client commits the file, Yarn starts the JobManager. The JobManager starts its own internal services and builds an ExecutionGraph.

In the process, FlinkX did two things:

  1. The other plugin and its createInputSplits will have a different value for that field if there is a lot of upstream data or if multiple parallelisms are required. For example, when reading MySQL for two degrees of parallelism, the sharding field is configured (for example, increment the primary key ID).

    • Select * from table where id mod 2=0;

    • Select * from table where id mod 2=1;

  2. After fragments are created, they are returned to each concurrent instance through getInputSplitAssigner in sequence.

The TaskManager end

After receiving a task from JobManager, TaskManager starts calling its own lifecycle, which includes the following important phases:

  1. * * the initialize – operator – states () : ** Loop through all operators of the task and call initializeState, which implements the CheckpointedFunction interface. In FlinkX DtInputFormatSourceFunction and DtOutputFormatSinkFunction, the method in the task start for the first time is called, is used to restore the state, If the task fails, the system can resume the read position at the last checkpoint, as shown in the following figure:

  2. **open-operators() : ** This method calls the open method of all streamoperators in the OperatorChain, and finally the open method in the BaseRichInputFormat. This method mainly does the following:

    • Initializing the accumulator to record the number of pieces and bytes read and written;

    • Initialize a custom Metric;

    • Open the speed limiter;

    • Initialization state;

    • Open a connection to read the data source (implemented by each plug-in, depending on the data source).

  3. ** Run () : ** Calls the nextRecord method in InputFormat and the writeRecord method in OutputFormat to process the data.

  4. **close-operators() : ** Close by calling InputFormat, OutputFormat’s close method, etc., and doing some cleanup.

This is the StreamTask lifecycle in TaskManager. In addition to how FlinkX calls the Flink interface, FlinkX has the following features.

  • ** Custom accumulators: ** Accumulators collect or aggregate information distributed from user functions and operations. Each parallel instance creates and updates its own Accumulator object, and then merges and collects different parallel instances, which are merged by the system at the end of the job and can push the result into Prometheus, as shown in the figure:

  • ** Support for offline and real-time synchronization: ** We know that FlinkX is a framework that supports offline and real-time synchronization.

    • Offline tasks: In DtInputFormatSourceFunction run method will be called the open method of the InputFormat read data records into a resultSet, then call reachedEnd method, To determine whether the data in the resultSet has been read. If the read is complete, the next close process is performed.

    • Real-time task: The open method is consistent with the offline method. It determines whether it is a polling task at the reachedEnd. If it is, it enters the branch of interval polling, takes the largest increment field value read in the last polling as the starting position of this polling, and conducts the next polling.

  • ** Dirty data management and error control: ** The data that fails to be written to the data source is recorded, and the reason for the error is classified, and then written to the configured dirty data table. There are four types of error causes: type conversion error, null pointer, primary key conflict, and other errors. Error control is based on the Flink accumulator, which records the number of errors during operation, and then periodically determines whether the number of errors has exceeded the configured maximum in a separate thread. If so, an exception is thrown to fail the task. In this way, different tasks are required for data accuracy and different error controls are performed. The control flow chart is as follows:

  • ** Speed limiter: ** Some upstream data produces too fast tasks, will cause a lot of pressure on the downstream database, so need to do some rate control at the source, FlinkX uses the token bucket flow limiting mode to control the rate. As shown in the following figure, when the rate of data generated by the source reaches a certain threshold, new data will not be read. In the open phase of the BaseRichInputFormat, the speed limiter is initialized.

This is the basic principle of FlinkX data synchronization, but in the data business scenario, data synchronization is only the first step. Because the current version of FlinkX only has EL in ETL, it does not have the ability to transform and compute the data, so the resulting data needs to be streamed downstream to FlinkStreamSQL.

FlinkStreamSQL

Based on Flink, the real-time SQL is extended, mainly extending the join of flow and dimension table, and supports all the syntax of native Flink SQL. Currently, the FlinkStreamSQL Source can only connect to Kafka, so the default upstream data source is Kafka.

Let’s look at how FlinkStreamSQL can be done on top of Flink. The user only needs to focus on the business SQL code, and how to call the Flink API to mask the underlying. The overall process is similar to the FlinkX described above, but the difference is in the Client side, which mainly includes SQL parsing, registry and SQL execution.

Parse SQL

Here is mainly to parse the user written CREATE function, create TABLE, CREATE View, INSERT into four SQL statements, encapsulated into structured SQLTree data structure. SQLTree contains a set of custom functions, a set of external data source tables, a set of view statements and a set of write data statements.

Table to register

Once you have the SQLTree resolved above, you can register the set of external data sources corresponding to the CREATE TABLE statement in SQL as tables in tableEnv, and the user-defined UDFs can be registered into tableEnv.

Execute SQL

After registering the data source as a table, you can execute the following INSERT into SQL statement.

  • If there is no dimension table associated in the SQL, execute the SQL directly;

  • Since earlier versions of Flink did not support the dimension table join syntax, we have extended it here, but after FlinkStreamSQL V1.11 we have been consistent with the community and supported the dimension table join syntax. Depending on the type of the dimension table, use different association methods:

    • Full dimension table: the upstream data is used as input, and RichFlatMapFunction is used as query operator. During initialization, the full data table is collected into memory, and then the data is combined with the input data to obtain the widened data. Then, a large table is registered again for subsequent SQL use.

    • Asynchronous dimension table: take the upstream data as input, use RichAsyncFunction as query operator, and use LRU cache for the query data, and then group with input data to get widened data, and then re-register a large table for subsequent SQL use.

This is the difference between FlinkX and FlinkStramSQL on the Client side. Since the source side is only Kafka and uses the community native Kafka-connector, there is no data sharking logic on the JobManager side. Taskmanager logic is similar to FlinkX logic and will not be covered here.

Mission operations

Once the business has been developed using FlinkX and FlinkStreamSQL, the next step is task operation. During the operation and maintenance phase, we mainly monitored the task operation information, data entry and exit metrics, data delay, back pressure, data skewing and other dimensions.

Task Running Information

We know that FlinkStreamSQL is based on FlinkSQL, so when the task is submitted to run, FlinkSQL parsing, validation, logical plan, logical plan optimization, physical plan, and finally run the task, and we get the DAG diagram we often see:

However, because FlinkSQL is so optimized for the task, we can only see the general DAG diagram like the one above, and the details of the subDAG diagram are not intuitive to see what is happening. Therefore, we have made some modifications in the original way of generating DAG graphs, so that we can intuitively see what happens in each Operator and each parallelism in the subDAG graphs. With detailed DAG graphs, other monitoring dimensions can be intuitively displayed, such as: Data input and output, delay, backpressure and data skewness can be specifically located when problems occur, as shown in the figure below:

With the above structure in mind, let’s look at how it is implemented. We know that the JobGraph will be generated when the Client submits the task. The taskVertices set in the JobGraph encapsulates the complete information in the figure above. After the taskVertices are generated in JSON, By combining LatencyMarker with relevant metrics, the graph above can be generated on the front end and the corresponding alarms can be generated.

In addition to the DAG above, there are also custom metrics, data latency acquisition, etc., which will not be detailed here. Interested students can refer to the FlinkStreamSQL project.

Use case:

With that in mind, let’s take a look at some practical examples of use on the platform. A complete example is shown below: Using FlinkX to synchronize new user data in MySQL to Kafka in real time, then using FlinkstreamSQL to consume Kafka in real time to calculate the number of new users per minute, and the results are stored in the database downstream to MySQL for business use.

Synchronize new MySQL data in real time

Calculate the number of new users per minute in real time

Run the information

The overall DAG can visually display multiple indicators mentioned above

After parsing the detailed DAG diagram, you can see a number of indicators inside the sub-DAG

The above is the overall architecture of Flink in kangaroo cloud real-time computing platform and some key technical points, if there are deficiencies, welcome to point out.