Background of platform construction

Traditional offline data development has poor timeliness and cannot meet the needs of fast iteration. With the rapid development of real-time technology represented by Flink, real-time computing is being used by more and more enterprises. However, various problems arise in the use of real-time computing. For example, developers have high barriers to use, the quality of output business data is not guaranteed, and enterprises lack unified platform management and are difficult to maintain. Under the influence of many unfavorable factors, we decided to use the existing Flink technology to build a complete set of real-time computing platform.

Overall platform architecture

From the overall architecture, real-time computing platform can be roughly divided into three layers:

  • Computing platform
  • Scheduling platform
  • Resource platform.

Each layer bears the corresponding function, and at the same time there is interaction between layers, which conforms to the design atom of high cohesion and low coupling. The architecture diagram is as follows:

Computing platform

Directly for use by developers, it can access various external data sources according to business needs and provide the use of subsequent tasks. After the data source configuration is completed, data synchronization based on Flink framework visualization and SQL data calculation can be done on it, and multi-dimensional monitoring and warning can be carried out on the running tasks.

Scheduling platform

After this layer receives the task content and configuration from the platform, the next is the core work, which is also the focus of the following content. Here is a general introduction. Depending on the type of task, different plug-ins will be used for parsing.

  • Data synchronization task: After receiving the JSON from the upper layer, it enters the Flinkx framework, generates the corresponding DataStream according to the difference between the data source end and the writing target end, and finally transforms it into JobGraph.
  • Data computation task: After receiving the SQL from the upper layer, enter the FlinkStreamSQL framework, parse the SQL, register it as a table, generate Transformation, and finally transform it into JobGraph.

The scheduling platform submits the obtained JobGraph to the corresponding resource platform to complete the task submission.

Resource platform

It is now possible to dock with several different resource clusters, as well as different resource types such as YARN and K8S.

Data synchronization and data computation

In the scheduling platform, after receiving the user’s task, the following series of transformation operations are started, and finally the task is run. We look at the low-level technical details of how to build a real-time computing platform based on Flink and how to use Flinkx, FlinkStreamSQL as a one-stop shop for development.

FlinkX

As the first and most basic step in data processing, let’s take a look at how Flinkx builds on Flink. The user needs to focus only on the JSON script for the synchronization task and some configuration, without worrying about the details of calling Flink, and Flinkx supports the functionality shown in the figure below.

Let’s take a look at the process involved in Flink task submission. The interaction flow chart is as follows:

How does Flinkx encapsulate and invoke these components on Flink’s basis, making it easier to use Flink as a data synchronization tool?

It is mainly extended from Client, JobManager and TaskManager, and the contents involved are as follows:

The Client side

Flinkx does some customization of the native Client. In the Flinkx-Launcher module, there are the following steps:

  1. Parse parameters, such as parallelism, savepoint path, program entry JAR package (usually written in Flink Demo), configuration in Flink-conf.yml, etc.
  2. Through the program entry JAR package, external incoming parameters, SavePoint parameters generated PackagedProgram;
  3. Call the main method of the program entry jar package specified in the PackagedProgram through reflection. In the main method, load the corresponding plug-in through the user configured Reader and Writer.
  4. Build JobGraph and add the required resources (JARs required by Flink, JARs for Reader and Writer, Flink configuration files, etc.) to the ShipFiles of YarnClusterDescriptor. Finally the YarnClusterDescriptor can interact with yarn to start the jobManager;
  5. After the task is successfully submitted, the Client gets the ApplicationID returned by YARN, and can then track the status of the task through the Application.

The JobManager end

After the Client commits, Yarn then starts the JobManager, which starts some of its own internal services and builds the ExecutionGraph.

In doing so, Flinkx did two things:

  1. The method of createInputSplits in the InputFormat interface is overwritten by different plug-ins to create a shard. This method sets a different shard for each degree of parallelism when the upstream data volume is large or when multiple parallelism reads are required. For example, when reading MySQL with two degrees of parallelism, by the configuration of the sharding field (such as increment primary key ID).

    • Select * from table where id mod 2=0; select * from table where id mod 2=0;
    • Select * from table where id mod 2=1;
  2. After the shard is created, the assignment to each concurrent instance is returned in order through getInputSplitAsSigner.

The TaskManager end

After TaskManager receives the task dispatched by JobManager, it starts its own life cycle call, which mainly includes the following important stages:

  1. Initialize – operator – states () : Loop through all operators for the task and call the initializeState method that implements the CheckPointFunction interface. In FlinkX DtInputFormatSourceFunction and DtOutputFormatSinkFunction, the method in the task start for the first time is called, is used to restore the state, When the task fails, the read position can be recovered from the last checkpoint, so as to achieve the purpose of continuing running, as shown in the figure below:

  1. Open-operators () : This method calls the open method of all streamOperators in the OperatorChain, and finally the open method in the BaseRichInputFormat. This method mainly does the following things:

    • Initialize the accumulator, record the number of read, write, byte number;
    • Initialize custom Metric;
    • Open the speed limiter;
    • Initialization state;
    • Open the connection to read the data source (each plug-in implements it individually, depending on the data source).
  2. Run () : Call the nextRecord method in InputFormat and the WriteRecord method in OutputFormat for data processing.
  3. Closed-operators () : Do some close operations such as calling InputFormat, OutputFormat’s close method, etc., and do some cleanup.

This is the overall life flow of StreamTask in TaskManager. In addition to how Flinkx calls the Flink interface described above, Flinkx also has the following features.

  • Custom Accumulators: Accumulators distribute statistics or aggregates information from user functions and operations. Each parallel instance creates and updates its own Accumulator object, and then merges and collects different parallel instances, which are merged by the system at the end of the job and can be pushed to Prometheus, as shown in Figure:

  • Support for offline and real-time synchronization: We know that Flinkx is a framework that supports offline and real-time synchronization, so let’s take a look at the MySQL data source as an example to see how it works.

    • Offline tasks: In DtInputFormatSourceFunction run method will be called the open method of the InputFormat read data records into a resultSet, then call reachedEnd method, To determine whether the data in the ResultSet has been read out. If it finishes reading, it goes through the subsequent CLOSE process.
    • Real-time task: The open method is the same as the offline method. It determines whether it is a polling task during ReachedEnd. If it is, it will enter the branch of interval polling, take the largest incremental field value read in the last polling as the starting position of this poll, and conduct the next poll.

  • Dirty data management and error control: Record the data that went wrong when writing to the data source, categorize the cause of the error, and then write to the configured dirty data table. There are four types of errors: type conversion error, null pointer, primary key conflict and other errors. Error control is based on Flink’s accumulator, which records the number of erroneous records as it runs, and then periodically determines in a separate thread whether the number of erroneous records has exceeded the configured maximum. If so, an exception is thrown to cause the task to fail. In this way, different error control can be done for different tasks requiring data accuracy. The control flow chart is as follows:

  • Speed Limiter: Some upstream data are generating too fast tasks, which can put a lot of pressure on the downstream database. Therefore, some rate control is needed on the source side. Flinkx uses token bucket flow limiting to control the rate. As shown in the figure below, when the rate of source-side data generation reaches a certain threshold, no new data is read, and the speed limiter is initialized in the open phase of the BaseRichInputFormat.

This is the basic principle of Flinkx data synchronization, but data synchronization is only the first step in the data business scenario. Since the current version of Flinkx only has the EL in ETL, it does not have the ability to transform and calculate the data, so it needs to flow the generated data into the downstream FlinkStreamSQL.

FlinkStreamSQL

Based on Flink, it extends real-time SQL, mainly extends the join of streams and dimension tables, and supports all the syntax of native Flink SQL. Currently, the source side of FlinkStreamSQL can only connect to Kafka, so the default upstream data source is Kafka.

Let’s take a look at how FlinkStreamSQL builds on Flink, where users only need to focus on the business SQL code, and how the Flink API can be invoked to mask the underlying layers. The overall process is basically similar to that of Flinkx introduced above, but the difference lies in the Client side, which mainly includes three parts: SQL parsing, registry and SQL execution.

Parse SQL

This is mainly to analyze the four SQL statements written by users, namely CREATE FUNCTION, CREATE TABLE, CREATE VIEW and INSERT INTO, and encapsulate them into the structured SQLTree data structure. SQLTree contains custom function set, external data source table set, view statement set, write data statement set.

Table to register

After obtaining the SQLTree parsed above, you can register the set of external data sources corresponding to the CREATE TABLE statement in SQL into TableEnv as a table, and register the user-defined UDF into TableEnv.

Execute SQL

After registering the data source as a table, you can execute the following INSERT INTO SQL statement, which can be divided into two cases:

  • SQL is not associated with the dimension table, directly execute SQL;
  • SQL is associated with dimension tables. Since earlier versions of Flink did not support join syntax for dimension tables, we extended this area, but since FlinkStreamSQL V1.11, we have been in line with the community and supported join syntax for dimension tables. Depending on the type of dimension table, different associations are used:

    • Full-dimension table: The upstream data is taken as input, and RichFlatmapFunction is used as the query operator. During initialization, the full table of data is retrieved into memory, and then the data is assembled with the input data to get the broad-width data. After that, a large table is registered again for subsequent SQL use.
    • Asynchronous dimension table: the upstream data is used as input, RichAsyncFunction is used as query operator, and the query data is used in LRU cache, and then the input data is combined to get the broaden data. After that, a large table is registered again for subsequent SQL use.

This is the difference between Flinkx and FlinkStramSQL on the Client side. Since the source side only has Kafka and uses the community-native Kafka-connector, there is no data sharder logic on the JobManager side. The TaskManager logic is basically similar to Flinkx and will not be covered here.

Mission operations

Once the business has been developed using Flinkx and FlinkStreamSQL, the next step is task operations. In the operation and maintenance stage, we mainly monitored the task operation information, data in and out metrics, data delay, back pressure, data skew and other dimensions.

Task running information

We know that FlinkStreamSQL is wrapped around FlinkSQL, so we have to perform the parsing, validation, logical plan, logical plan optimization, physical plan of FlinkSQL when we submit the task to run, and finally we get the DAG diagram that we often see:

But because FlinkSQL optimizes the task so much that we can only see the general DAG diagram above, some of the details in the sub-DAG diagram are not intuitive to see what is going on. Therefore, we have made some modification on the original way of generating DAG graph, so that we can visually see what happens in each Operator and each degree of parallelism in the sub-DAG graph. After having a detailed DAG graph, some other monitoring dimensions can be visually displayed, such as: Data input and output, delay, back pressure and data tilt can be specifically located when problems occur, as shown in the figure below:

With the above structure in mind, let’s look at how it is implemented. We know that when a Client submits a task, it generates a JobGraph. The taskVertices set in the JobGraph encapsulates the entire information from the above image. Combine LatencyMarker with the associated metrics to generate the figure in the front end and make the corresponding alarm.

In addition to the DAG above, there are also custom metrics, delayed data acquisition, etc., which are not covered here, but for those who are interested, you can refer to the FlinkStreamSQL project.

Use case:

With that in mind, let’s look at a real-world example of how to use it on a platform. A complete example is shown below: use Flinkx to synchronize new user data from MySQL to Kafka in real time, then use FlinkStreamSQL to consume Kafka to calculate new users per minute in real time, and the output results are stored in downstream MySQL for business use.

Synchronize new MySQL data in real time

New users per minute are calculated in real time

Run the information

The overall DAG can visually display the above mentioned multiple indicators

After analyzing the detailed DAG diagram, you can see the multiple indicators inside the sub-DAG



The above is the overall architecture and some key technical points of Flink’s real-time computing platform in Kangaroo Cloud. You are welcome to point out any shortcomings.