To come from

Relational apis have many benefits: they are declarative, the user only needs to tell them what is needed, and the system decides how to calculate; Users do not have to specifically implement; It is easier to optimize and can be executed more efficiently. Flink itself is a distributed computing platform for unified batch and streaming, so one of the goals of the community in designing the relational API is to make it a unified layer, with the same semantics and syntax for both queries. Most of the flow processing framework apis are relatively low level apis, high learning cost and a lot of logic needs to be written into the UDF, so Apache Flink added sqL-like API to handle relational data –Table API. The most important concept in this API is Table(a structured DataSet or DataStream on which relational operations can be performed). The Table API is tightly integrated with the DataSet and DataStream apis, so that both DataSet and DataStream can be easily converted to a Table, and it is also convenient to convert back:

val execEnv = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(execEnv)

// obtain a DataSet from somewhere
val tempData: DataSet[(String.Long.Double)] =

// convert the DataSet to a Table
val tempTable: Table = tempData.toTable(tableEnv, 'location.'time.'tempF)
// compute your result
val avgTempCTable: Table = tempTable
 .where('location.like("room%"))
 .select(
   ('time / (3600 * 24)) as 'day.'Location as 'room((,'tempF - 32) * 0.556) as 'tempC
  )
 .groupBy('day.'room)
 .select('day.'room.'tempC.avg as 'avgTempC)
// convert result Table back into a DataSet and print it
avgTempCTable.toDataSet[Row].print()Copy the code

Example uses Scala’s API, and the Java VERSION of the API does the same.

The following diagram shows the architecture of the Table API:

Create a Table from DataSet or DataStream and perform relational operations such as fliter, JOIN, and SELECT on it. Operations on the Table will be converted to a tree of logical operators. When the Table is converted back to the DataSet and DataStream, it will be converted to the operator of the DataSet and DataStream. Some expressions like ‘location.like(“room%”) will be compiled into Flink functions through code Generation.

However, the traditional Table API initially had some limitations. First, it can’t stand on its own. The Query of the Table API must be embedded in the program of the DataSet or DataStream. Queries to batch tables do not support outer join, sorting, and many scalar functions common in SQL. For stream processing, only filtetr union and projection are supported. Aggregation and join are not supported. Moreover, there are not many query optimization techniques utilized during the conversion, except those applicable to all DataSet programs.

The Table API is tightly integrated with SQL

With the increasing popularity of stream processing and the growth of Flink in this area, the Flink community decided that a simpler API was needed to enable more users to analyze stream data. A year ago the Flink community decided to take the Table API to a new level, extending the streaming capabilities of the Table API and supporting SQL. Not wanting to reinvent the wheel, the community decided to build a new Table API based on Apache Calcite, a popular SQL parsing and optimization framework. Apache Calcite is used in many projects including Apache Hive, Apache Drill, Cascading and more. In addition, the Calcite community has written SQL on Stream into its roadmap, so Flink’s SQL is a good fit for it.

New architecture diagram with Calcite at its core:

The new architecture provides two apis for relational queries, the Table API and SQL. The queries in both apis are validated against the catalog containing the registered tables, which are then converted into a unified Calcite logical plan. In this representation, the stream and Batch queries look exactly the same. Next, Calcite’s cost-based optimizer is used to optimize the transformation rules and logical plan. Optimize using different rules depending on the nature of the data source (streaming and static). Finally, the optimized plan was transferred to the routine Flink DataSet or DataStream program. This step also involves code Generation (converting relational expressions into Flink functions).

Let’s take an example to understand the new architecture. The expression is converted to a Logical Plan as shown below:

Calling the Table API creates logicalNodes for many of the Table apis, and validates the entire query. For example, table is CalalogNode, window groupBy will create WindowAggregate and Project after select, where corresponds to Filter. Then use RelBuilder to translate into Calcite LogicalPlan. If it’s an SQL API it’s going to be interpreted directly by Calcite’s Parser and then validate generates Calcite LogicalPlan.

Optimize LogicalPlan using some of Calcite’s built-in rules, or you can add or overwrite them yourself. Optimized Calcite Plan, which is still the internal representation of Calcite, needs to be transformed into DataStream Plan, corresponding to the class in the third column in the figure above. It encapsulates how to translate into a normal DataStream or DataSet program. Then the corresponding tanslateToPlan method is called to convert and metaprogram various operators into Flink using CodeGen. This is now equivalent to a program we developed directly using Flink’s DataSet or DataStream API.

The new architecture of the Table API has improved a lot beyond maintaining the original principles. Maintains a unified interface for relational queries of streaming and static data, and utilizes Calcite’s query optimization framework and SQL Parser. The design is built on the API already built by Flink. DataStream API provides low latency and high throughput stream processing capability with exact-once semantics and event-time based processing. Moreover, DataSet has stable and efficient memory operator and pipelined data exchange. All of Flink’s core API and engine improvements are automatically applied to the Table API and SQL.

The new SQL interface is integrated into the Table API. DataSteam, DataSet, and external data sources can be registered as tables in the TableEnvironment so that they can be queried through SQL. The tableEnvironment.sql () method is used to declare the SQL and return the result as a Table. Here is a complete sample that reads a flow table from a JSON-encoded Kafka Topic, processes it in SQL, and writes it to another Kafka Topic.

// get environments
val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(execEnv)

// configure Kafka connection
val kafkaProps = ...
// define a JSON encoded Kafka topic as external table
val sensorSource = new KafkaJsonSource[(String, Long, Double)](
    "sensorTopic",
    kafkaProps,
    ("location"."time"."tempF"))

// register external table
tableEnv.registerTableSource("sensorData", sensorSource)

// define query in external table
val roomSensors: Table = tableEnv.sql(
    "SELECT STREAM time, location AS room, (tempf-32) * 0.556 AS tempC" +
    "FROM sensorData " +
    "WHERE location LIKE 'room%'"
  )

// define a JSON encoded Kafka topic as external sink
val roomSensorSink = new KafkaJsonSink(...)

// define sink for room sensor data and execute query
roomSensors.toSink(roomSensorSink)
execEnv.execute()Copy the code

This sample ignores the most interesting parts of stream processing: Window Aggregate and Join. How are these operations expressed in SQL? The Apache Calcite community has made a proposal to discuss the syntax and semantics of SQL on Streams. Calcite’s Stream SQL is described by the community as an extension of standard SQL rather than another SQL-like language. This has a number of benefits. First, someone familiar with the SQL standard will be able to analyze stream data without learning the new syntax. Static and flow tables have nearly identical queries and can be easily portable. In addition, queries can be performed on both static and flow tables, which is consistent with Flink’s vision of batch processing as special flow processing (batch as finite flow). Finally, using standard SQL for stream processing means that there is a lot of mature tool support.

The following example shows how to use the SQL and Table API to perform a sliding window query:

SQL

SELECT STREAM
  TUMBLE_END(time.INTERVAL '1' DAY) AS day,
  location AS room,
  AVG((tempF - 32) * 0.556) AS avgTempC
FROM sensorData
WHERE location LIKE 'room%'
GROUP BY TUMBLE(time.INTERVAL '1' DAY), locationCopy the code

Table API

val avgRoomTemp: Table = tableEnv.ingest("sensorData")
  .where('location.like("room%"))
  .partitionBy('location)
  .window(Tumbling every Days(1) on 'time as 'w)
  .select('w.end, 'location,,,,'tempF - 32) * 0.556).avg as 'avgTempCs)Copy the code

The status of the Table API

Batch SQL & Table API support:

  • Selection, Projection, Sort, Inner & Outer Joins, Set operations
  • Windows for Slide, Tumble, Session

Streaming Table API support:

  • Selection, Projection, Union
  • Windows for Slide, Tumble, Session

Streaming SQL:

  • Selection, Projection, Union, Tumble

Streaming SQL case

Continuous ETL and data import

Take streaming data and transform it (normalization, aggregation…) , write it to another system (File, Kafka, DBMS). The results of these queries are typically stored in log-style systems.

Real-time Dashboards and reports

Take streaming data and aggregate it to support online systems (Dashboards, recommendations) or data analytics systems (Tableau). Usually the results are written to k-V storage (Cassandra, Hbase, searchable Flink state), indexed (Elasticsearch) or DBMS (MySQL, PostgreSQL…). . These queries can often be updated and improved.

Ad-hoc analysis

Ad-hoc queries for streaming data that analyze and browse data in real time. The query results are displayed directly in the Notebook (Apache Zeppelin).

Flink community also proposed the concept of Dynamic table, which is similar to the Materialized View in the database, and will be supported in the future version. The specific details will be explained in another article.

The original link http://mtunique.com/flink_sql/