Author: Li Hui

In the early stage of The development of Panyu, a series of real-time requirements emerged. For example, algorithm engineers expected to get real-time characteristic data of users for real-time recommendation, and product managers hoped that data side could provide real-time indicator kanban for real-time operation analysis.

At this stage, the data development engineers in Taiwan mainly developed the Spark real-time computing engine to meet the requirements of the business side. However, there is no unified platform for such operations to be managed, and the development form, submission method and availability guarantee of tasks also vary from person to person.

With the rapid development of business, more and more real-time scenarios emerge, which puts forward higher requirements on the development efficiency and quality assurance of real-time operations. To this end, we started to build a company-level real-time computing platform named Palink last year, which is a combination of Palfish + Flink.

Flink was chosen as the platform’s only real-time computing engine because of its outstanding performance and dominant position in the real-time space in recent years, as well as the active community atmosphere that provides a lot of good practical experience to learn from. At present, Palink project has been landed and put into use, which well meets the needs of fish companion business in real-time scenarios.

Core principles

By investigating the real-time computing services provided by aliyun, netease and other major manufacturers, we have basically determined the entire product form of Palink. At the same time, the following core principles are closely followed in the system design process:

  • ** Minimalism: ** keep simple design, fast landing, not excessive pursuit of functional integrity, mainly meet the core needs;
  • ** High quality: ** Maintain strict requirements of project quality, and consider the core modules thoroughly;
  • ** Scalable: ** Maintains high scalability to facilitate the iterative upgrade of subsequent solutions.

The system design

Overall platform architecture

The following is a schematic diagram of the overall platform architecture:

The platform is composed of four parts:

  • **Web UI: ** front-end operation page;
  • **Palink (GO) service: ** Real-time job management service, responsible for the management of job meta-information and all states within the job life cycle, to undertake all front-end traffic. Including job scheduling, job submission, job status synchronization and job HA management several core modules;
  • **PalinkProxy(JAVA) service: **SQL service, from which Flink SQL jobs will compile and submit to a remote cluster. Including SQL syntax check, SQL job debugging and SQL job compilation and submission several core modules;
  • **Flink On Yarn: ** Manage cluster resources based On Hadoop Yarn.

There are three main reasons why the background service is divided into two parts and implemented by GO and JAVA language respectively:

  • Firstly, Banyu has a very perfect micro-service basic framework based on GO language, which can quickly build services and has a series of peripheral supporting facilities including service monitoring. At present, more than 95% of the company’s services are built based on this service framework.

  • Second, sqL-based modules are implemented based on the secondary development of an open source project (more on this later), which uses the JAVA language.

  • Third, the cost of adding a remote call to the internal service is acceptable.

This also reflects our principle of minimalism for fast landing. In fact, the use of GO as the core development language is very characteristic of Palfish, which will be reflected in the next article in the fish Big Data series.

Next, this paper will focus on the design of several core modules of Palink.

Job scheduling & Execution

After receiving the request from the front-end to create a job, the back-end service will generate a PalinkJob record and a PalinkJobCommand record and persist them to DB. PalinkJobCommand abstracts an entity from the execution stage of job submission. The entire job scheduling process moves forward around state changes for that entity. Its structure is as follows:

type PalinkJobCommand struct {
 ID            uint64 `json:"id"`                       
 PalinkJobID   uint64 `json:"palink_job_id"`  
 CommandParams string `json:"command_params"` 
 CommandState  int8   `json:"command_state"`  
 Log           string `json:"log"`                      
 CreatedAt     int64  `json:"created_at"`        
 UpdatedAt     int64  `json:"updated_at"`      
}
Copy the code

There is no direct linkage of the scheduling process based on the PalinkJob entity, because the synchronization of the state of the job directly affects this entity. If the scheduling process is based on this entity, the logic of the two parts is tightly coupled.

Scheduling process

The following is the flow chart of job scheduling:

Palink Pod asynchronously performs competing distributed lock operations to ensure that one and only one instance obtains periodic monitoring privileges at a time. Commands that meet the conditions are directly sent to the Kafka queue, and their state is changed to ensure that they will not be scheduled again. In addition, all Palink pods will act as pending queue consumers and belong to the same consumer group, and instances that consume messages will get final execution rights.

Execute the process

The execution of a job is actually a process of job submission, which can be divided into three types according to the submission workflow of different job types:

  • **Flink JAR jobs: ** We have eliminated the interaction of users uploading JAR files directly. The user only needs to provide the address of gitLab warehouse, and the whole process platform can be completed directly. Since each service instance is embedded with a Flink client, the task is submitted directly through Flink Run.
  • **PyFlink jobs: ** Similar to the Flink JAR approach, with less compilation and different submission commands.
  • **Flink SQL job: ** is different from the above two methods. For Flink SQL jobs, users only need to submit relatively simple SQL text information, which we maintain directly in the platform meta-information, so there is no place to interact with gitLab warehouse. The SQL text is then submitted to the PalinkProxy service for subsequent compilation, and then submitted using Yarn Client.

The Command state machine

The state flow of PalinkJobCommand is shown below:

  • **UNDO: ** initial state, which will be monitored by the scheduling instance.
  • **DOING: ** executing state, which will also schedule instance monitoring to prevent long-term dirty state.
  • **SUCCESSED: ** Execution status is successful. With the subsequent actions of the user, such as resubmission and restart, the status will return to the UNDO state.
  • **FAILED: indicates that the execution fails. As above, the state may return to the UNDO state again.

Job status synchronization

After a job is successfully submitted to the cluster, the task terminates unexpectedly due to the uncertainty of the cluster status or some other factors. How should the platform know in time? This brings us to another topic we’re going to cover: state synchronization.

Status Synchronization Process

The first question to answer here is: Synchronize whose state?

If you have experience in offline or Flink on YARN development, you must know that there is an application corresponding to a job after it is deployed to YARN. Each application has its corresponding state and operation action. For example, we can execute the Kill Application operation on Yarn UI to Kill the entire task.

Similarly, when we browse through Flink official documents or enter Flink UI page, we can also see that each task has its corresponding state and a series of operation behaviors. The most immediate thought would be the Flink task state, after all, that’s what we want most.

However, after careful analysis, there is no significant difference between the two states for the platform. The state granularity of YARN Application is different. The STATE mapping of THE Flink state has been performed. However, considering that when Flink is HA, the URL exposed by the job will change, in this case, the latest address can only be obtained by obtaining the corresponding application information of the job.

At the same time, a status synchronization process not only wants to get the latest status, but also synchronizes related information such as checkpoint. It seems that both information needs to be obtained in the process of synchronization, and the final state synchronization design is as follows:

The pre process is similar to the Job scheduling process in that there is only one and only one instance responsible for the periodic monitoring work, and the qualified Job ids (note that not all jobs are synchronized, such as some jobs in the final state) are sent to the internal delay queue. The reason why delay queue is used instead of Kafka queue is mainly to randomly disperse the requirements of batch synchronization at the same point in time within a certain period of time to reduce the synchronization pressure. Finally, after obtaining the complete information of the job, state mapping is done again to map the state to the abstract state type of the platform.

Because state synchronization is periodic, there is a lag. Therefore, when the platform obtains job details, it also triggers a status synchronization to ensure that the latest data is obtained.

Job state machine

The state flow of PalinkJob is shown below:

  • Debug: DEPLOY_SUCCESSED and DEPLOY_FAILED with PalinkJobCommand
  • **DEPLOY_SUCCESSED: ** Deployment success state driven to RUNNING or other final state by job “state synchronization”.
  • **DEPLOY_FAILED: ** Deployment fails and relies on users to resubmit flows to the debug state.
  • **RUNNING: **RUNNING status. The FINISHED state can be moved to the FINISHED state, the KILLED state can be moved to the KILLED state, or the FINISHED state can be moved to the FAILED state due to an internal exception.
  • **FINISHED: **FINISHED state. The job is returned to this state by the user performing the pause operation.
  • **KILLED: **KILLED, one of the final states of the job. The job is returned to this state by the user performing the termination action.
  • **FAILED: **FAILED state. The job exception changes to this state.

Job HA Management

With these issues resolved, another topic to be discussed is “Job HA management.” We need to answer the following two questions from users:

  • A job is stateful, but a job requires code upgrade. How to handle this?
  • How do I recover from the failed point in time when the job failed?

Flink provides two mechanisms for recovery jobs: Checkpoint and Savepoint, collectively referred to in this article as savepoints. A Savepoint is a special Checkpoint, but unlike a Checkpoint that is generated regularly in the system, a Savepoint is triggered by a user’s command. The user can control when a Savepoint is generated.

When a task is started, the task can be recovered from the Savepoint by specifying the Checkpoint or Savepoint external path. Our management of platform job HA is based on both. The following figure is the flow chart of management:

Users have two ways to manually stop a job: pause and terminate.

  • The pause action, implemented by calling the Flink Cancel API, triggers the job to generate a Savepoint.
  • The termination operation is implemented by calling the YARN Kill Application API, which is used to quickly end a task.

When a suspended job is restarted, the system compares the generation time of Savepoint and Checkpoint and starts the job according to the latest Savepoint. When the job is resubmitted, the user decides whether to resume the job according to the Savepoint because the user may have changed the code logic. Aborted jobs, whether restarted or resubmitted, are directly user-determined, because the aborted operation is inherently a throwaway of the job state.

A job in the failed state was stopped due to an exception error. For this type of job, there are three safeguards:

  • First, the task itself can set a restart policy to automatically recover, and the external platform is not aware of it.
  • Second, if the internal restart still fails, the upper restart policy can be set again on the platform side.
  • Third, manually restart or resubmit. Only on resubmission, the user decides which way to start, and the rest of the scenarios start at the nearest savepoint.

Task SQL,

Both Flink JAR and PyFlink use the Flink API to develop operations, which will greatly increase the learning cost of users and affect the efficiency of development. Engineers with development skills in this area need to be constantly imported and trained to meet the constant flow of business needs.

The product positioning is not only for the development engineers in the data center, but also for the offline target users. The target groups can be infiltrated into analysts and even the business research and development and part of the product managers. Simple needs can be realized by ourselves. In order to achieve this goal, it is necessary to develop the form of offline, the job SQL is imperative.

We expect Flink to provide a way to submit jobs that is similar to Hive Cli or Hive JDBC, without requiring the user to write a line of Java or Scala code. According to the official document, Flink does provide an SQL client to support writing, debugging and submitting table programs to Flink cluster in a simple way. However, up to the latest release 1.13, the SQL client only supports embedded mode, and related functions are not perfect enough. Support for connector is also limited. Therefore, it is necessary to seek a more stable and more scalable implementation scheme.

After some research, we found that kangaroo Cloud open source “FlinkStreamSQL” basically meets our current requirements. This project is based on the open source Flink and extends real time SQL to support all the syntax of native Flink SQL.

Implementation mechanisms

The following figure is the job role flow chart provided by Flink. As can be seen from the figure, the code submitted by users will be processed and transformed on the Client side (eventually generating Jobgraph) and then submitted to the remote cluster.

In order to achieve the SQL job at the user level, the underlying implementation is also unable to get around this process. In fact, the FlinkStreamSQL project implements client-side logic by means of customization. The whole process can be briefly described as:

Build PackagedProgram

Generate JobGraph with package programutils.

Submit jobs through the ClusterDescriptor.

Among them, the first step is the most critical. The construction method of PackagedProgram is as follows:

PackagedProgram.newBuilder()
                .setJarFile(coreJarFile)
                .setArguments(execArgs)
                .setSavepointRestoreSettings(savepointRestoreSettings)
                .build();
Copy the code

ExecArgs is the external input parameter, which contains the SQL submitted by the user. CoreJarFile corresponds to the JAR file submitted by the user in API development mode, but here the system implements it for us. The code for coreJarFile corresponds to the core Module in the project, which is essentially a template template for API development. The module implements custom SQL parsing and various Connector plugin injection. More details are available through the open source project.

Custom development

We did secondary development based on FlinkStreamSQL to meet more diverse internal needs. It is mainly divided into the following points:

  • Servitization: The whole SQL-based module is independently deployed and managed as a proxy, exposing services in the form of HTTP.
  • Support syntax verification features;
  • Debugging features: You can directly obtain the structure information of source table and Sink table by parsing SQL structure. The platform can obtain the test data set through manual construction or online capture of source table data. The Sink operator is directly replaced by the localTest Connector operator to intercept the output of the result data.
  • Support more connector plugins, such as Pulsar Connector;
  • Other features.

In addition to some of the features mentioned above, the platform also supports:

  • DDL statement injection
  • UDF management
  • Tenant management
  • Version management
  • Operation monitoring
  • Log collection

These points are not covered in detail in this article, but they are essential as a real-time computing platform.

Effect of online

Job overview

Job details

Operation monitoring

The future work

As the business continues to advance, the platform will continue to be iteratively optimized in the following aspects:

  • ** Stability construction: stability construction of ** real-time tasks is bound to be a priority in the future work. How job parameters are set, how jobs are automatically tuned, and how jobs maintain stable performance during peak traffic needs to be explored and refined.
  • ** Improve development efficiency: **SQL construction. Although SQL-based has already taken shape, it still has some learning costs in development, the most obvious of which is DDL construction. Users are not clear about source and Sink’s schema. The best way is that the platform can connect with our metadata center to automate DDL construction. And that’s what we’re doing;
  • ** Optimize user experience: To some extent, problems with user experience directly affect the efficiency of development. Continuous improvement through collecting user feedback;
  • ** Explore more business scenarios: ** At present, Banyu has started to build AI, real-time data warehouse and other scenarios based on Flink. We will continue to promote Flink’s practice in more scenarios in the future.