Brief introduction:“Real-time Database Bootcamp” by Ali cloud researcher Wang Feng, Ali cloud senior product expert Liu Yiming and other real-time computing Flink version and Hologres technology/product experts to join the battle, together to build the training camp curriculum system, carefully polishing the course content, directly hit the current students encountered pain points. Analyse the architecture, scenario and practical application of the real-time database from the simple to the deep. 7 excellent courses will help you grow from a little white to a great man in 5 days!

The paper sorting from live the real-time computing Flink SQL practice – Li Lin (seals) video link:

Brief content:

Real-time computing Flink version of SQL Introduction

Second, real-time calculation Flink version of SQL starting example

Third, the development of common problems and solutions

An introduction to real-time computing Flink version of SQL

(a) about real-time computing Flink version of SQL

Real-time computing Flink version of the choice of SQL this declarative language as the top-level API, more stable, but also convenient for users to use. Flink SQL has the feature of stream-batch unification, giving users a unified development experience and consistent semantics. In addition, Flink SQL is capable of automatic tuning, including the complexity of State in masking stream calculations, as well as auto-tuning plans, and has integrated Autopilot auto-tuning capabilities. Flink SQL is also used in a wide range of scenarios, including data integration, real-time reporting, real-time risk control, and online machine learning.

(2) Basic operation

In terms of basic operations, you can see that the syntax of SQL is very similar to standard SQL. Basic SELECT and FILTER operations are included in the example. , you can use built-in functions, such as date formatting, or you can use custom functions, such as the exchange rate conversion in the example, which is a user-defined function that can be used directly after being registered on the platform.

Lookup Join Lookup Join Lookup Join

In actual data processing, Lookup Join of dimension table is also a more common example.

An example of a dimension table INNER JOIN is shown here.

The Source table shown in the example is a real-time changing order information table, which uses Inner Join to associate dimension table information. Here, highlighted in yellow is the syntax of dimension table JOIN. We can see that there is a difference between it and traditional batch processing in writing. The FOR SYSTEM\_TIME AS OF clause is added to indicate that it is an operation on a dimension table JOIN. Every time the Source table receives an order message, it triggers the dimension table operator to perform a query on the dimension table information, so it is called a Lookup Join.

Window Aggregation is very important

Window Aggregation is also a common operation. Flink SQL has built-in support for several common Window types, such as Tumble Window, Session Window, Hop Window, and Tumble Window. There is also the newly introduced Cumulate Window.


Tumble Windows can be understood as Windows of a fixed size, also known as rolling Windows, such as those that are spaced at fixed intervals of 5, 10, or 1 hour, with no overlap between them.


The Session Window defines a range of consecutive events. A parameter in the Window definition is called the Session Gap, which means that if the interval between two pieces of data exceeds the defined length of time, the previous Window ends and a new Window is created.


Unlike scrolling Windows, Hop Windows do not overlap. Sliding Windows can overlap each other. Sliding Windows take two parameters: Size and Slide. Slide is the size of each slide. If Slide < Size, the Windows will overlap and the same data may be assigned to multiple Windows. If slide = Size, this is equivalent to Tumble Window. If Slide > Size, there is no overlap and gaps between Windows.


Cumulate Window Cumulate Window Cumulate Window is a new addition to the Flink community in version 1.13. You can compare this with Hop Window. The difference is that it starts with Window Start and accumulates. In the example, Window 1, Window 2, and Window 3 are increments. If it has a maximum Window length, let’s say the Window Size is one day and the Step Size is one hour, then it will generate aggregate results that accumulate up to the current hour for each hour of the day.

Take a look at a concrete example of Window aggregation handling.

As shown in the figure above, for example, we need to count the number of clicks per user every 5 minutes.

The source data is the user’s click log. We expect to calculate the total number of clicks per 5 minutes for a single user. We use the latest Windows VF syntax for the community. Count (*) is the COUNT of hits.

As you can see, when processing the data between 12:00 and 12:04, 2 users generated 4 clicks, respectively, we can calculate that user Mary had 3 clicks and user Bob had 1 click. In the next batch of data, there are three more data, corresponding to the next window, is 1 and 2 times respectively.

(V) Group Aggregation

Compared with Window Aggregation, Group Aggregation directly triggers calculation without waiting until the end of the Window. An applicable scenario is to calculate the cumulative value.

The example above is the number of hits a single user has accumulated to the current count. GROUP BY user to calculate COUNT(*); GROUP BY user to calculate COUNT(*);

As can be seen, the output of the results is different from that of Window. In the first four input data that are the same as Window, the output of Group Aggregation shows that Mary’s click number has been updated to 3 times. The specific calculation process may change from 1 to 2 and then to 3, while that of Bob is once. With the input of the next three pieces of data, the number of hits corresponding to Bob will be updated twice again. The result is a process of continuous update, which is different from the calculation scene of Window.

The data output in the previous Window Window will not change after the end of the Window. However, under Group Key, the result of the same Group Key will be updated continuously.

Window Aggregation Vs Group Aggregation

To compare some differences between Window and Group Aggregation in a more comprehensive way.

Window Aggregation is output on time in the output mode and only outputs after the expiration of the defined data. For example, if the window is defined for 5 minutes, the result is delayed output. For example, during the period of 00:00~00:05, it will wait for the whole window data to arrive before the complete output, and the result will only be output once and will not be changed again.

Group Aggregation is data triggered. For example, the first data will output results, while the second data of the same Key will update the results. Therefore, the two are different in terms of the nature of the output stream. Window Aggregation normally outputs APPEND STREAM, whereas Group Aggregation outputs UPDATE STREAM.

There is also a big difference between the two in the treatment of State. Window Aggregation automatically clears stale data so that users do not have to pay any extra attention to State bloat. Group Aggregation is based on unlimited State, so users need to define the TTL of the State according to their own computing scenario, that is, how long the State is stored.

For example, the cumulative PV and UV within a day should be counted. Regardless of the data delay, TTL of State should be at least one day greater than or equal to one day, so as to ensure the accuracy of calculation. If the TTL of STATE is defined as half a day, the statistics may not be accurate.

The storage requirements for the output are also determined by the nature of the output stream. On the output of Window, since it is the APPEND stream, all types are available for output. The Group Aggregatio outputs the update stream, so the target store is required to support updates, which can be used with Hologres, MySQL, or HBase.

Real-time computing Flink version of SQL starting examples

The following are concrete examples of how each SQL operation would be used in a real business scenario, such as basic SQL syntax operations, including the use of some common aggregations.

(I) Example scenario: E-commerce transaction data – real-time data warehouse scenario

The example here is an e-commerce transaction data scenario, which simulates hierarchical data processing in a real-time data warehouse.

In the data access layer, we simulate the transaction order data of e-commerce, which includes order ID, commodity ID, user ID, transaction amount, leaf category of commodity, transaction time and other basic information. This is a simplified table.

Example 1 will be from a detailed data access layer to layer, a data cleaning work, moreover also do link category information, then we will demonstrate how data aggregation layer levels complete minutes transaction statistics, hours caliber do real-time transaction statistics, finally introduces the clinch a deal the scene on the accumulation level, how to do it quasi real-time statistics.

– Sample environment: private beta

The demo environment is the current private version of the real-time computing Flink product, where you can directly do one-stop job development, including debugging, and online operation and maintenance work.

– Access layer data

Generate simulated e-commerce transaction data using SQL DataGen Connector.

Access Layer Data: For demonstration purposes, the link is simplified and the built-in SQL DataGen Connector is used to simulate the generation of e-commerce data.

The order\_id is designed as an incrementing sequence. Connector parameters are not fully posted. DataGen Connector supports several generation modes. For example, the SEQUENCE can be used to generate self-incrementing sequences, and the RANDOM mode can simulate Random values. Here, different generation strategies are selected based on the business meaning of fields.

For example, the order\_id is self-increasing, the product ID is randomly selected from 10 ~ 100,000, the user ID is 1~10 million, the transaction amount is in units, and the cate\_id is leaf category ID. Here, a total of 100 leaf categories are simulated, and the product ID is generated directly by taking the remainder of the computed column. The order creation time is simulated using the current time. This allows debugging on the development platform without the need to create Kafka or DataHub emulation for the access layer.

(2) Example 1-1 Data cleaning

-E-commerce transaction data -Order filtering

This is a scenario of data cleaning. For example, the business party may have the maximum and minimum abnormal filtering for the transaction amount, for example, the transaction amount shall be greater than 1 yuan and less than 10,000 yuan to be retained as valid data.

The creation time of the transaction is selected after a certain moment, and this logic is completed by filtering the WHERE condition combination.

A real business scenario can be much more complex, so let’s take a look at how SQL works.

This is the debug mode. Click the “Run” button on the platform for local debugging, and you can see that the column of amount is filtered, and the order creation time is also greater than the required time value.

As you can see from this simple cleaning scenario, there is not much difference in the way the output is written compared to traditional batch processing. The main difference of a stream job is that it runs for a long period of time after it has been run, unlike traditional batch processing, which processes the data and then ends.

(3) Example 1-2 Categories Information Association

Now let’s look at how to do dimension table associations.

According to the data access layer order just now, because the raw data is inside the leaves of category information, dimension table need association class in our business purpose, dimension table records inside the leaves to the class object of class relationships, ID and Name, cleaning processes need to complete the goal is to use the original table inside leaf category ID to correlation dimension table, lacking purpose class ID and Name. Here, through the writing method of Inner Join dimension table, the corresponding fields of dimension table are selected after association.

The only difference between batch and SYSTEM is the special syntax OF dimension tables FOR SYSTEM\_TIME AS OF.

As shown above, the platform can upload its own data for debugging. For example, the test data of 1 CSV is used here to map 100 leaf categories to 10 first-level categories.

The single digit of the corresponding leaf category ID is the ID of its first level class, which is associated with the corresponding first level category information and returns its name. The advantage of local debugging is that the speed is relatively fast, you can see the results immediately. In the local debug mode, the terminal will automatically pause after receiving 1000 pieces of data to prevent the result from being too large to affect the use.

(4) Example 2-1 minute level transaction statistics

Next, let’s look at Window-based statistics.

The first scenario is minute-level transaction statistics, which is a common calculation logic at the summary level.

Minute-level statistics are easily thought of as Tumble Window, where every minute is calculated individually, requiring several indicators to be calculated, including total orders, total money, number of items traded, number of users traded, etc. Transaction number of goods and users to do to double, so in the writing method to do a Distinct processing. The Window is the Tumble Window just described, which is a one-minute Window with the order creation time and then counts every minute’s worth of transactions in the Level 1 category dimension.

– Operation mode

The above diagram is a little different from the debug mode just now. After going online, it is actually submitted to the cluster to run a job. Its output adopts debug output, and directly prints to the Log. Expand the job topology and you can see that the two-phase optimization of local-global is automatically turned on.

– Run Log – View the debug output

After running for some time, the final output can be seen through the log in the Task.

If I use Print Sink, it will go directly to the Log. On the output of the real scenario, such as writing to Hologres/MySQL, you need to look at the corresponding stored database.

It can be seen that the output data is lagging behind the original time of the data.

At 19:46:05, the data of the window of 19:45:00 was output, with a delay of about 5 seconds to output the aggregate results of the previous 1 minute.

This 5 seconds is actually related to the WATERMARK setting when defining the source table. When declaring the WATERMARK, the GMT \ _CREATE field is offset for 5 seconds. The effect of this is that when the earliest data arrived at 19:46:00, we thought the water level was at 19:45:55, which is the 5-second delay effect to realize the tolerance of out-of-order data processing.

(V) Example of 2-2-hour real-time transaction statistics

The second example is to do hourly level real-time transaction statistics.

As shown in the figure above, when real-time statistics are required, you can directly set Tumble Window to a one-hour Size. Does this meet the real time requirement? According to the output just shown, there is a certain delay effect. Therefore, when the window is opened for one hour, the results of the last hour can only be output at the beginning of the next hour after all the data have been received. If the delay is at the hour level, it cannot meet the requirements of real-time performance. Recall that Group Aggregation, as described previously, can meet real-time requirements.

To be specific, the GROUPING SETS function commonly used in traditional batch processing is also supported on real-time Flink, for example, the GROUPING SETS function with hours plus categories and the GROUPING SETS with hours only need to be completed.

We can directly GROUP BY GROUPING SETS. The first one is the total aperture per hour, the second one is the statistical aperture of category + hours, and then calculate the order number, including the total amount, the number of goods and the number of users.

In this way, the null value conversion processing is added to the results to facilitate the view of the data, that is, the statistics of the hour full caliber. The output first-level category is empty, so a null value conversion processing is needed.

In the debug mode running above, you can see the data generated by DATAGEN updated in real time to the first category and its corresponding hours.

It can be seen here that the results of two different groups BY are output together, and there is a column of ALL in the middle which is converted BY null value, and this is the statistic value of full caliber. Local debugging is relatively intuitive and convenient. If you are interested, you can also go to the official website of Ali Cloud to apply or purchase for experience.

(VI) Example of 2-3 days cumulative transaction of quasi-real-time statistics

A third example is day-level cumulative transaction statistics, where business requirements are quasi-real-time, such as being able to accept minute-level update delays.

According to the real-time statistics of Group Aggregation just now, it is easy to think that this requirement can be realized by directly changing Query into day dimension. Moreover, the real-time performance is relatively high, and second level updates can be achieved after data is triggered.

Recall the differences between Windows and Group Aggregation in the built-in State handling mentioned earlier. Window Aggregation can automatically clean up State, while Group Aggregation requires the user to adjust the TTL. The Cumulate Window can be used to calculate the cumulative Window. The Cumulate Window can be used to calculate the cumulative Window. The Cumulate Window can be used to calculate the cumulative Window, and then the minute step size can be used to achieve the quasi-real-time requirement of updates per minute.

Recall the Cumulate Window as shown above. The maximum Size of the Window is in days, and the Window Step is one minute, so it can represent the cumulative statistics of days.

Cumulate Window refers to the input TABLE. Cumulate Window refers to the input TABLE. Cumulate Window refers to the input TABLE and defines its time properties, step size, and size parameters. Group BY is the normal way of writing it, because it has pre-output, so we print the start and end time of the window together.

This example also looks at the Log output in terms of running on line.

– Operation mode

As you can see, it works in much the same structure as Tumble Window before, in the form of pre-aggregation plus global aggregation, but unlike Tumble Window, you don’t need to wait until all the data is available for the day before you send out results.

– Run Log – Observe the debug results

As you can see from the above example, at 20:47:00, there has been a cumulative result from 00:00:00 to 20:47:00, and the corresponding four columns of statistics. The next output is the following cumulative window, from 20:47:00 to 20:48:00 is a cumulative step size, which not only meets the day-level cumulative statistical requirements, but also meets the requirements of quasi-real-time.

(7) Example summary: E-commerce transaction data – real-time warehouse scenario

Let’s then summarize the above example as a whole.

The cleaning process from the access layer to the detail layer is relatively simple and clear. For example, the business logic needs to do fixed filtering conditions, including the expansion of dimensions, which are very clear and direct.

From the detail layer to the summary layer, Tumble Window is used for the minute level statistics in the example, while Group Aggregation is changed to Group Aggregation for the hour level due to the real-time requirements. Then, Group Aggregation and newly introduced Cumulate Window were presented at the sky level.

From the calculation characteristics of the summary layer, we need to pay attention to the real-time requirements and data accuracy requirements of the business, and then choose Window aggregation or Group aggregation according to the actual situation.

Why mention data accuracy here?

When comparing Window Aggregation and Group Aggregation at the beginning, it was mentioned that Group Aggregation has good real-time performance, but its data accuracy is dependent on TTL of State. When the statistical period is greater than TTL, Then the TTL data may be distorted.

On the contrary, on Window Aggregation, there is an upper limit of tolerance for out-of-order Aggregation, such as one minute at most. However, in actual business data, perhaps 99% of the data can meet such requirements, and 1% of the data may take an hour to arrive. Based on the WaterMark processing, it is a discarding strategy by default. If the data exceeds the maximum offset, it will be discarded and not included in the statistics. At this time, the data will also lose its accuracy, so this is a relative index, which needs to be selected according to specific business scenarios.

Develop solutions to common problems

(a) common problems in the development

At the top are some of the more frequent questions used to compute real business contacts in real time.

First of all, I don’t know how to do real-time computing, how to start to do real-time computing, for example, some students have a batch processing background, and then just started to contact Flink SQL, I don’t know where to start.

Another kind of problem is that you have written SQL and you know what level of data you are going to input, but you don’t know how much resource you need to set when the real-time job is running

There is also a kind of SQL writing is more complex, this time to do debugging, such as to find out why the calculated data does not meet the expectations and other similar problems, many students say that they have no way to start.

How to tune the job after it has run is also a very frequent question.

(2) to develop solutions to common problems

1. How to do real-time computing?

For getting started, the community has a lot of official documentation, but also provides some examples, you can start from a simple example, slowly understand the SQL inside the different operators, in the streaming calculation will have some of what kind of features.

In addition, you can also pay attention to the developer community real-time computing Flink version, website, B site Apache Flink public account and other sharing content.

After gradually getting familiar with SQL, if you want to apply it to the production environment to solve real business problems, Ali Cloud industry solutions also provide some typical architecture design, which can be used as a reference.

2. How to debug complex jobs?

If there are any level one thousand lines of complex SQL, even for the development of the Flink classmate to locate the problem can’t be clear at a glance, but still need to follow by Jane to numerous process, may need to use some debugging tools, such as the demonstration in front of the platform debugging, and then do section of validation, the small piece of SQL local result is correct after debugging, And put them together step by step to get the complex job right.

In addition, you can take advantage of SQL syntax features to organize the SQL a little more cleanly. Real-time Computation The Flink product has a code structure feature that makes it easy to locate specific statements in long SQL. These are some of the AIDS.

3. How to adjust the initial resource setting of the job?

A rule of thumb for us is to do a little bit of concurrent testing initially, based on input data, to see how well it performs, and then to estimate. It is a straightforward but reliable way to approach the expected performance configuration based on the required throughput during a large concurrent run test.

The tuning part is mainly based on the operation situation of the job. We will pay attention to some key indicators, such as whether the data is skewed, the Lookup Join of the dimension table needs to access external storage, and whether the bottleneck of IO is generated. These are common bottleneck points that affect the performance of the job and need to be paid attention to.

The Flink product for real-time computing includes a feature called Autopilot, which can be understood as similar to AutoPilot, where the initial resource set is not a problem.

On the product, after setting the maximum resource limit for the job, the engine can automatically help us adjust the amount of resources to the optimal state according to the actual data processing capacity, and make scaling according to the load situation.

Copyright Notice:The content of this article is contributed by Aliyun real-name registered users, and the copyright belongs to the original author. Aliyun developer community does not own the copyright and does not bear the corresponding legal liability. For specific rules, please refer to User Service Agreement of Alibaba Cloud Developer Community and Guidance on Intellectual Property Protection of Alibaba Cloud Developer Community. If you find any suspected plagiarism in the community, fill in the infringement complaint form to report, once verified, the community will immediately delete the suspected infringing content.