This article is the second in a series of Flink SQL articles. The previous article described the new Flink 1.9 Table architecture and the use of the Flink 1.9 Table Planner in detail. This article explains the five tableenvironments and their application scenarios in detail. It also introduces the future planning of Flink community for TableEnvironment. The main contents are as follows:

  1. TableEnvironment profile
  2. 5 TableEnvironment combing
  3. How to use TableEnvironment
  4. Community Future Planning

1. TableEnvironment profile

TableEnvironment is used to create Table & SQL program context execution environment, is also the entrance of Table & SQL program, Table & SQL program all functions are around the TableEnvironment this core class. TableEnvironment’s main functions include: connecting to external systems, registering and retrieving tables and metadata, executing SQL statements, and providing more detailed configuration options.

In Flink 1.8, there were seven Tableenvironments. In the latest Flink 1.9, the community refactored and optimized only five tableenvironments. This paper explains five Tableenvironments and their application scenarios in detail, and introduces the future planning of TableEnvironment by Flink community.

2. 5 TableEnvironment combing

Flink 1.9 retains 5 Tableenvironments, which are 5 user-oriented interfaces in terms of implementation. Different implementations are carried out at the bottom of interfaces. 5 interfaces including a TableEnvironment interface, two BatchTableEnvironment interface, two StreamTableEnvironment interface, 5 interface file complete path as follows:

  • org/apache/flink/table/api/TableEnvironment.java
  • org/apache/flink/table/api/java/BatchTableEnvironment.java
  • org/apache/flink/table/api/scala/BatchTableEnvironment.scala
  • org/apache/flink/table/api/java/StreamTableEnvironment.java
  • org/apache/flink/table/api/scala/StreamTableEnvironment.scala

TableEnvironment is the top interface and the base class of all TableEnvironment. Both BatchTableEnvironment and StreamTableEnvironment provide Java and Scala implementations with two interfaces, respectively.





Five TableEnvironment

TableEnvironment, as a unified interface, is unified in two aspects. First, it is unified for all JVM-based languages (i.e. there is no difference between Scala API and Java API). Second, the processing of unbounded data (i.e. stream data) and bounded data (i.e. batch data) should be unified. TableEnvironment provides a pure Table ecological context, suitable for the whole job using Table API & SQL programming scenarios. Currently, TableEnvironment does not support UDTF and UDAF registration. Users can choose to use other TableEnvironments if they need to register UDTF and UDAF.

Two StreamTableEnvironments are used for Java stream computing and Scala stream computing scenarios, where the objects for stream computing are Java DataStream and Scala DataStream. Compared to TableEnvironment, StreamTableEnvironment provides an interface between DataStream and Table. If a user’s program is not written using Table API & SQL, You also need to use the DataStream API, so you need to use the StreamTableEnvironment.

The two BatchTableEnvironments are used in Java batch processing scenarios and Scala batch processing scenarios respectively. The batch processing objects are Java DataSet and Scala DataSet respectively. Compared with TableEnvironment, BatchTableEnvironment provides the interface of converting between DataSet and Table. If the user’s program is written in addition to using Table API & SQL, You also need to use the DataSet API, so you need to use the BatchTableEnvironment.

From the job types supported by these five TableEnvironments (Stream job and Batch job), the API types supported by these five Tableenvironments (DataStream API and DataSet API), And the five aspects of UDTF/UDAF support are compared. The functions supported by each TableEnvironment can be summarized as follows:





TableEnvironment supports function comparison

You might wonder why the API needs to distinguish between Java and Scala’s two StreamTableEnvironments (or batchTableEnvironments), DataStream can be divided into Java DataStream and Scala DataStream.

Are the main reasons why TableEnvironment registerTableFunction method (used to register UDTF) and registerAggregateFunction method (user registration UDAF) need to extract generics, The existing Java generic extraction mechanism is different from Scala’s generic extraction mechanism. Java extraction is implemented through reflection mechanism, while Scala is implemented through Scala Macro. In addition, the TableEnvironment as a unified entry point does not support registering UDTF and UDAF at this stage due to inconsistencies in the extraction generics mechanism. To address this problem, the community has been planning to introduce a new type extraction mechanism to unify Java and Scala type extraction, and realize the unification of Java API and Scala API.





5 TableEnvironment concrete implementation

Combining Flink Planner and Blink Planner, we can further comb the organizational relationship of TableEnvironment, and notice some interesting details:

  • Since the concept of DataSet is missing in the Blink Planner, BatchTableEnvironment is no longer used, only TableEnvironment and StreamTableEnvironment are used. Flink Planner (Old Planner) supports five table environments.
  • The implementation of BatchTableEnvironment is in the Old Planner (Flink-table-Palnner module), which will be phased out in the community’s future planning.

3. How to use TableEnvironment

According to the type of planner and job, the application scenarios of each TableEnvironment can be divided into 4 categories. The following codes are combined to illustrate how to use TableEnvironment in different scenarios.

Scene 1:

The user uses the Old Planner for the development of a Table program for flow calculations (programs developed using the Table API or SQL). In this scenario, users can use StreamTableEnvironment or TableEnvironment, the difference being that StreamTableEnvironment provides an additional interface to interact with DataStream APIS. Example code is as follows:

// **********************
// FLINK STREAMING QUERY USING JAVA
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);
// **********************
// FLINK STREAMING QUERY USING SCALA
// **********************
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala.StreamTableEnvironment
val fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings)
// or val fsTableEnv = TableEnvironment.create(fsSettings)Copy the code

Scene 2:

The user uses Old Planner to develop the Table program for batch processing. In this scenario, users can only use BatchTableEnvironment, because in the Old Planner, the batch program operates on data that is DataSet, and only BatchTableEnvironment provides interface implementation for DataSet. Example code is as follows:

// ******************
// FLINK BATCH QUERY USING JAVA
// ******************
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
// ******************
// FLINK BATCH QUERY USING SCALA
// ******************
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.scala.BatchTableEnvironment
val fbEnv = ExecutionEnvironment.getExecutionEnvironment
val fbTableEnv = BatchTableEnvironment.create(fbEnv)Copy the code

Scenario 3:

The user uses the Blink Planner to develop the Table program for flow calculation. In this scenario, users can use StreamTableEnvironment or TableEnvironment, the difference being that StreamTableEnvironment provides an additional interface to interact with DataStream APIS. The user declares Blink Planner in EnvironmentSettings and sets the execution mode to StreamingMode. Example code is as follows:

// **********************
// BLINK STREAMING QUERY USING JAVA
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
// **********************
// BLINK STREAMING QUERY USING SCALA
// **********************
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala.StreamTableEnvironment
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
// or val bsTableEnv = TableEnvironment.create(bsSettings)Copy the code

Scene 4:

The user uses the Blink Planner to develop the Table program for batch processing. In this scenario, users can only use TableEnvironment, because when using Blink Planner, the data operated by batch processors were already bounded DataStream, so BatchTableEnvironment cannot be used. The user declares Blink Planner in EnvironmentSettings and sets the execution mode to BatchMode. StreamingMode and BatchMode are already supported in the TableEnvironment interface. The StreamTableEnvironment interface does not currently support BatchMode configuration, so this scenario cannot use StreamTableEnvironment. Example code is as follows:

// ******************
// BLINK BATCH QUERY USING JAVA
// ******************
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
// ******************
// BLINK BATCH QUERY USING SCALA
// ******************
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings)Copy the code

4. Community future planning

At present, the community is promoting the batch processing capability of DataStream to realize the unification of streaming batch stack. At that time, the DataSet API will retire from the stage of history, and the two BatchTableEnvironment will also retire from the stage of history. The community is also working to unify the Java and Scala TableEnvironment. It is foreseeable that the future architecture of Flink TableEnvironment will be more concise. TableEnvironment will be Flink’s recommended entry class that supports both Java and Scala apis, as well as streaming and batch jobs. You need to use the StreamTableEnvironment only if you need to convert to DataStream.


The original link

This article is the original content of the cloud habitat community, shall not be reproduced without permission.