Expansion of FlinkSQL to join a stream with a dimension table

Why do you want to extend FlinkSQL?

Real-time computing requires complete SQLization

SQL is the most widely used language for data processing. It allows users to concisely declare their business logic. The use of SQL for big data batch computing is common, but there are not many real-time computations that support SQL. In fact, using SQL to develop real-time tasks can greatly reduce the threshold of data development, in the kangaroo cloud stack – real-time computing module, we decided to achieve complete SQL.

Advantages of using SQL for data calculation

Spend time and energy on statement. The user only needs to express what I want, and how to calculate that is the business of the system, the user does not care.

Spend time and energy on automatic tuning. The query optimizer can generate the best execution plan for the user’s SQL. Users don’t need to know about it to automatically enjoy the performance benefits of the optimizer.

Be easy to understand. SQL is understood by many people in many different industries and in many different fields. The barrier to learning SQL is very low, and using SQL as a cross-team development language can greatly improve productivity.

☑ stability. SQL is a language that has been around for decades, and it is a very stable language with very little change. So when we upgrade an engine version, or even replace it with another engine, we can do a compatible and smooth upgrade.

Refer to the link: https://blog.csdn.net/weixin_…

2. Real-time computation also requires JOIN of stream and dimension table

In the real time computing world, not only the JOIN between streams, but also the JOIN between streams and dimension table is needed. During the development of version 3.0 of the Kangaroo Cloud Stack last year, the latest version of the stack, FlinkSQL in Flink1.6, has applied the advantages of SQL to the Flink engine, but does not yet support JOIN of streams with dimension tables.

FlinkSQL started to open stream computing services to Alibaba Group in July 2017. Although it is a very young product, it has supported thousands of jobs by Singles’ Day. During the Singles’ Day period, the peak processing of Blink job reached 500 million + / s. Flink SQL jobs alone had an overall peak of 300 million/SEC.

Refer to the link: https://yq.aliyun.com/article…

Let’s first explain what a dimension table is; A dimension table is a dynamic table. The data stored in the table may not change, or may be updated periodically, but it is not updated frequently. In business development, general dimension table data is stored in relational databases such as MySQL, Oracle, etc., and may also be stored in NoSQL databases such as HBase, Redis, etc.

3. FlinkSQL realizes the join of flow and dimension table step by step

1. Use Flink API to realize the function of dimension table

Flink AysNc I/O is used to implement the dimension table function, which is contributed by Alibaba to Apache Flink.

Async I/O is contributed by Alibaba to the community and introduced in version 1.2. The main purpose of Async I/O is to solve the problem that network delay becomes the system bottleneck when interacting with external systems.

We can see this article in detail: http://wuchong.me/blog/2017/0…

Flink’s equivalent API is RichAsyncFunction, an abstract class that implements the open, AsyncInvoke, and close methods inside Flink. The main thing is to implement the method inside AsyncInvoke.

There are two problems with the join of a stream with a dimension table:

1) The first is a performance issue.

If the flow rate is very fast, each piece of data needs to be joined to the dimension table. However, the data of the dimension table exists in a third-party storage system. If the third-party storage system is accessed in real time, not only the performance of Join will be poor, but network IO will be needed every time. It will also put a lot of pressure on the third-party storage system, and it is possible to shut down the third-party storage system.

Therefore, the solution is to cache the data in the dimension table, which can be cached in full quantity. This is mainly the case that the data of the dimension table is not large, and another is the LRU cache, which is the case that the data of the dimension table is large.

2) The second problem is that the delayed data from the stream is thus associated with the previous dimension table data.

This involves the need to store snapshot data in the dimension table, so in such scenarios, HBase is more suitable for the dimension table, because HBase naturally supports multiple versions of data.

2. The SQL syntax for parsing the join of the stream and dimension table is converted into the underlying FLINKAPI

Since FlinkSQL has already done most of the SQL scenarios, it is not possible to parse all of the SQL syntax and translate it into the underlying FlinkAPI.

So what we do is we parse the SQL syntax to find out if there is a dimension table in the join table. If there is a dimension table in the join table, we split the dimension table statement separately and use Flink’s TableAPI and StreamAPI to generate a new DataStream. Join the DataStream with other tables so that you can use SQL to implement the join syntax for streams and dimension tables.

The tool for SQL parsing is Apache Calcite, and Flink uses the same framework for SQL parsing. So all syntax is parsed.

1) DEMO SQL

insert

into

MyResult select d.channel, D. info from (select a.*,b.info from MyTable a join sideTable b on a.hannel = b.namewhere a.hannel = 'xc2 'and a.v =10 ) as d

2) Calcite parses the Insert into statement

select a.*,b.info from MyTable a join sideTable b on a.channel=b.name

  where a.channel = 'xc2' and a.pv=10

select d.channel, d.info from d

insert into MyResult

3) CalCite continues to parse the SELECT statement

old: select a.*,b.info from MyTable a join sideTable b on a.channel=b.name

where a.channel = ‘xc2’ and a.pv=10





The stack is a cloud-based, native, site-based PaaS for data, and we have an interesting open source project on GitHub and Gitee: FLINKX, FLINKX is a unified data synchronization tool based on FLINK batch stream, which can collect static data, but also can collect real-time changing data. It is a global, heterogeneous, batch stream data synchronization engine. If you like, please give us a star! Star! Star!

Making open source projects: https://github.com/DTStack/fl…

Gitee open source projects: https://gitee.com/dtstack_dev…