Website links: ci.apache.org/projects/fl…

Table API & SQL concept and general API

Apache Flink has two relational APIS – the Table API and SQL- for unified streaming and batch processing. The Table API is a language integrated query API for Scala and Java that allows queries from relational operators such as select, filter, and join to be combined in a very intuitive way. Flink SQL support based on the implementation of the SQL standard Apache Calcite. Queries specified in both interfaces have the same semantics and specify the same results, regardless of whether the input is a batch input DataSet or a stream input DataStream.

The Table API is tightly integrated with the SQL interface and Flink’s DataStream and DataSet apis. You can easily switch between all apis and API-based libraries. For example, you can use the CEP library to extract patterns from DataStream and then use the Table API to analyze the patterns, or you can use SQL queries to scan, filter, and aggregate batch tables and then run the Gelly graph algorithm data on a preprocessed program.

Note that the Table API and SQL are not yet functional and are under active development. Each combination of [Table API, SQL] and [Stream, Batch] inputs does not support all operations.

Table API & SQL program structure

All Table apis and SQL programs for batch and streaming follow the same pattern. The following code example shows the general structure of the Table API and THE SQL program.

// step1: Create a TableEnvironment for a particular execution plan batch or flow
val tableEnv = ... // see "Create a TableEnvironment" section

// Step2: Registry
tableEnv.registerTable("table1",...).// or
tableEnv.registerTableSource("table2",...).// or
tableEnv.registerExternalCatalog("extCat",...).// Step 3: Register the output table
tableEnv.registerTableSink("outputTable",...). ;// Step 4: Query API
val tapiResult = tableEnv.scan("table1").select(...)
// create a Table from a SQL query
val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ...")

// Step 5: Send the Table API result Table to TableSink, which is the same as the SQL result
tapiResult.insertInto("outputTable")

Step 6: Execute the program
tableEnv.execute("scala_job")
Copy the code

Note: The Table API and SQL queries can be easily integrated and embedded with DataStream or DataSet programs. See Integrating with DataStream and DataSet apis to learn how to convert DataStream and DataSet to Tables and vice versa.

Create a TableEnvironment

TableEnvironment is a central concept in the integration of the Table API and SQL. It is responsible for:

  • Register tables in internal Catalog
  • Registering the external Catalog
  • Execute SQL queries
  • Register the user-defined (Scalar, table, or aggregation) functions
  • Convert DataStream or DataSet to a Table
  • Hold on ExecutionEnvironment or StreamExecutionEnvironment references

Tables are always bound to a specific TableEnvironment. It is not possible to combine tables of different TableEnvironments in the same query, for example, join or union them.

By calling the StreamExecutionEnvironment or ExecutionEnvironment and optional TableConfig static BatchTableEnvironment. The create () or StreamTableEnvironm Ent.create () method to create the TableEnvironment. TableConfig can be used to configure the TableEnvironment or to customize the query optimization and transformation process (see Query optimization).

Be sure to select a specific planner, BatchTableEnvironment/StreamTableEnvironment, that matches your programming language. If both scheduler jars are on the classpath (the default behavior), you should explicitly set the scheduler to be used in the current program.

/ / * * * * * * * * * * * * * * * * * * * * * *
// FLINK STREAMING QUERY
/ / * * * * * * * * * * * * * * * * * * * * * *
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala.StreamTableEnvironment

val fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings)
// or val fsTableEnv = TableEnvironment.create(fsSettings)

/ / * * * * * * * * * * * * * * * * * *
// FLINK BATCH QUERY
/ / * * * * * * * * * * * * * * * * * *
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.scala.BatchTableEnvironment

val fbEnv = ExecutionEnvironment.getExecutionEnvironment
val fbTableEnv = BatchTableEnvironment.create(fbEnv)

/ / * * * * * * * * * * * * * * * * * * * * * *
// BLINK STREAMING QUERY
/ / * * * * * * * * * * * * * * * * * * * * * *
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala.StreamTableEnvironment

val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
// or val bsTableEnv = TableEnvironment.create(bsSettings)

/ / * * * * * * * * * * * * * * * * * *
// BLINK BATCH QUERY
/ / * * * * * * * * * * * * * * * * * *
import org.apache.flink.table.api.{EnvironmentSettings.TableEnvironment}

val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings)
Copy the code

Note: If there is only one Planner JAR in the Lib directory, you can use useAnyPlanner to create specific EnvironmentSettings.

In the Catalog registry

Catalog: All metadata information about databases and tables is stored in the internal Catalog structure of Flink Catalog, which stores all metadata information related to tables in Flink, including Table structure information/data source information, etc.

TableEnvironment maintains the Catalog of tables registered by name. There are two types of tables, input tables and output tables. You can reference input tables and provide input data in the Table API and SQL queries. The output table can be used to send the results of a table API or SQL query to an external system.

The input form can be registered from a variety of sources:

  • An existing Table object, usually the result of a Table API or SQL query.
  • One TableSource, for accessing external data such as a file, database, or messaging system.
  • DataStream (for stream jobs only) or DataSet (for batch jobs converted from old schedulers only) in the program.

You can use TableSink to register the output table.

The registration Table

Register a Table in TableEnvironment as follows:

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section

// table is the result of a simple projection query 
val projTable: Table = tableEnv.scan("X").select(...)

// register the Table projTable as table "projectedTable"
tableEnv.registerTable("projectedTable", projTable)
Copy the code

Note: Registered tables are handled similarly to known views in relational database systems, that is, the query that defines the Table is not optimized, but is inlined when another query references the registered Table. If multiple queries reference the same registry, the table will be inlined for each reference query and executed multiple times, that is, the results of the registry will not be shared.

Registered TableSource

TableSource Is used to access external data stored in a storage system, such as a database (MySQL, HBase, etc.), a file with a specific encoding (CSV, Apache [Parquet, Avro, ORC], etc.), or a messaging system (Apache Kafka, The RabbitMQ, etc.).

Flink aims to provide TableSources for common data formats and storage systems. See the “[Table Sources and Sinks]” page for a list of supported Table Sources and instructions on how to build custom Table Sources.

TableSource Is registered in the TableEnvironment as follows:

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section

// create a TableSource
val csvSource: TableSource = new CsvTableSource("/path/to/file",...).// register the TableSource as table "CsvTable"
tableEnv.registerTableSource("CsvTable", csvSource)
Copy the code

Note: The TableEnvironment used for the Blink execution scheduler only accepts StreamTableSource, LookupableTableSource, and InputFormatTableSource, And the StreamTableSource used for the batch Blink scheduler must be bounded.

Registered TableSink

Registered TableSink can be used to send the result of a Table API or SQL query to an external storage system, such as a database, KV storage, message queue, or file system (with a different encoding, such as CSV, Apache [Parquet, Avro, ORC],…) .

Flink is designed to provide TableSink for common data formats and storage systems. Refer to the documentation on the Table Sources and Sinks page for details on available sinks and instructions on how to implement custom TableSink.

TableSink Is registered in the TableEnvironment as follows:

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section

// create a TableSink
val csvSink: TableSink = new CsvTableSink("/path/to/file",...).// define the field names and types
val fieldNames: Array[String] = Array("a"."b"."c")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT.Types.STRING.Types.LONG)

// register the TableSink as table "CsvSinkTable"
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)
Copy the code

Registering the external Catalog

External Catalogs can provide information about external databases and tables, such as their names, schemas, statistics, and information about how to access data stored in external databases, tables, or files.

You can create external directories and register them in The TableEnvironment by implementing the ExternalCatalog interface, as shown below:

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section

// create an external catalog
val catalog: ExternalCatalog = new InMemoryExternalCatalog

// register the ExternalCatalog catalog
tableEnv.registerExternalCatalog("InMemCatalog", catalog)
Copy the code

Once registered in the TableEnvironment, all tables defined in ExternalCatalog can be accessed from the TABLE API or SQL query by specifying the full path of the table (for example, catalog.database.table).

Currently, Flink provides an InMemoryExternalCatalog for demonstration and testing. However, you can also connect directories such as HCatalog or Metastore to the Table API using the ExternalCatalog interface.

Note: The BLINK execution plan does not support external directories.

Query a Table

Table API

The Table API is a language integrated query API for Scala and Java. In contrast to SQL, the query is not specified as a string, but instead is progressively constructed in the host language.

The API is based on the Table class, which represents a Table (streaming or batch) and provides a way to apply relational operations. These methods return a new Table object that represents the result of applying a relational operation to the input Table. Some relational operations consist of multiple method calls, such as table.groupby (…) , the select (); The groupBy (…). Specify the group of the table, and select(…) Projection table in grouping.

The Table API documentation describes all the Table API operations supported by flow tables and batch tables.

The following example shows a simple Table API aggregate query:

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section

// register Orders table

// scan registered Orders table
val orders = tableEnv.scan("Orders")
// compute revenue for all customers from France
val revenue = orders
  .filter('cCountry= = ="FRANCE")
  .groupBy('cID.'cName)
  .select('cID.'cName.'revenue.sum AS 'revSum)

// emit or convert Table
// execute query
Copy the code

Note: The Scala Table API uses the Scala symbol, which begins with a tick (‘) to refer to Table attributes. The Table API uses Scala implicitly. Make sure the import org. Apache. Flink. API. Scala. And org. Apache. Flink. Table. API. Scala. _ to use scala implicit conversions.

SQL API

Flink’s SQL integration is based on Apache Calcite, which implements the SQL standard. SQL queries are specified as regular strings. The SQL document describes Flink’s SQL support for flow tables and batch tables.

The following example shows how to specify a query and return the result as a Table:

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section

// register Orders table

// compute revenue for all customers from France
val revenue = tableEnv.sqlQuery(""" |SELECT cID, cName, SUM(revenue) AS revSum |FROM Orders |WHERE cCountry = 'FRANCE' |GROUP BY cID, cName """.stripMargin)

// emit or convert Table
// execute query
Copy the code

The following example shows how to specify an update query to insert the query results into the registry:

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section

// register "Orders" table
// register "RevenueFrance" output table

// compute revenue for all customers from France and emit to "RevenueFrance"
tableEnv.sqlUpdate(""" |INSERT INTO RevenueFrance |SELECT cID, cName, SUM(revenue) AS revSum |FROM Orders |WHERE cCountry = 'FRANCE' |GROUP BY cID, cName """.stripMargin)

// execute query
Copy the code

Mix table API and SQL

Table apis and SQL queries can be easily mixed, as they both return Table objects:

  • You can define a Table API query on the Table object returned by the SQL query.
  • You can define SQL queries on the results of a Table API query by registering the result Table in the TableEnvironment and referencing it in the FROM clause of the SQL query.

Submit the Table

Table is submitted by writing the Table to TableSink. TableSink is a universal interface that supports various file formats (such as CSV, Apache Parquet, Apache Avro), storage systems (such as JDBC, Apache HBase, Apache Cassandra, Elasticsearch) or messaging systems (e.g. Apache Kafka, RabbitMQ).

Only BatchTableSink can be written to a batch table. For a streaming table, AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink is required.

Refer to the documentation on Table Sources & Sinks for detailed information on available receivers and instructions on how to implement custom TableSink.

The table.insertinto (String tableName) method submits a Table to the registered TableSink. This method searches for TableSink from the directory by name and verifies whether the schema of the Table is the same as that of the TableSink.

The following example shows how to issue a table:

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section

// create a TableSink
val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")

// register the TableSink with a specific schema
val fieldNames: Array[String] = Array("a"."b"."c")
val fieldTypes: Array[TypeInformation] = Array(Types.INT.Types.STRING.Types.LONG)
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink)

// compute a result Table using Table API operators and/or SQL queries
val result: Table=...// emit the result Table to the registered TableSink
result.insertInto("CsvSinkTable")

// execute the program
Copy the code

Translate and execute the query

The behavior of translating and executing the query is different for the two execution plans.

Depending on whether the input to the Table API and SQL queries is stream or batch input, they translate to DataStream or DataSet programs. The query is represented internally as a logical query plan and is divided into two phases:

  1. Optimized logical plan
  2. To a DataStream or DataSet program

The Table API or SQL query is translated in the following cases:

  • The Table is sent to TableSink, i.e. when table.insertinto () is called.
  • Specify the SQL update query, namely in the call TableEnvironment. SqlUpdate ().
  • Convert the Table to DataStream or DataSet (see Integration with DataStream and DataSet API).

After translation, the Table API or SQL queries will be processed like regular DataStream or DataSet programs, And the call StreamExecutionEnvironment. The execute () or ExecutionEnvironment. The execute () is carried out.

Integration with DataStream and DataSet apis

Both execution plans on streams can be integrated with the DataStream API. Only older execution schedulers can be integrated with the DataSet API. Blink and Batch execution schedulers cannot be integrated with the DataSet API.

Note: The DataSet API discussed below relates only to older schedulers used in bulk.

The Table API and SQL queries can be easily integrated and embedded with DataStream and DataSet programs. For example, you can query external tables (for example from an RDBMS), do some preprocessing such as filtering, projection, aggregation, or joining with metadata, and then further use the DataStream or DataSet apis (and any libraries built on top of them, such as CEP or Gelly). Instead, you can apply the Table API or SQL queries to the results of a DataStream or DataSet program.

This interaction can be achieved by converting DataStream or DataSet to a Table and vice versa.

Implicit conversions in Scala

The Scala Table API has implicit conversions to datasets, DataStream, and Table classes. By Scala DataStream API import org. Apache. Flink. Table. API. Scala. Packages and org. Apache. Flink. API. Scala. _ bags, can enable the conversion.

Register DataStream or DataSet as a Table

You can register DataStream or DataSet as a table in the TableEnvironment. The schema of the resulting table depends on the data type of the DataStream or DataSet that has been registered.

// get TableEnvironment 
// registration of a DataSet is equivalent
val tableEnv: StreamTableEnvironment=...// see "Create a TableEnvironment" section

val stream: DataStream[(Long.String)] =...// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream)

// register the DataStream as table "myTable2" with fields "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, 'myLong.'myString)
Copy the code

Note: The name of the DataStream table must not match the pattern of ^ DataStreamTable [0-9] +, and the name of the DataSet must not match the pattern of ^ DataSetTable [0-9] +. These patterns are for internal use only.

Convert DataStream or DataSet to a Table

In addition to registering DataStream or DataSet in the TableEnvironment, you can also convert it directly to a Table. This is handy if you want to use tables in your Table API queries.

// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv = ... // see "Create a TableEnvironment" section

val stream: DataStream[(Long.String)] =...// convert the DataStream into a Table with default fields '_1, '_2
val table1: Table = tableEnv.fromDataStream(stream)

// convert the DataStream into a Table with fields 'myLong, 'myString
val table2: Table = tableEnv.fromDataStream(stream, 'myLong.'myString)
Copy the code

Convert the Table to DataStream or DataSet

When converting a Table to DataStream or DataSet, you need to specify the data type of the resulting DataStream or DataSet, that is, the data type to which the rows of the Table are to be converted. The most convenient conversion type is usually Row. The following list Outlines what the different options do:

  • ROW: Fields mapped by location, any number of fields, null values supported, no type safe access.
  • POJO: Fields are mapped by name (POJO fields must be named as Table fields), any number of fields, null values supported, type safe access.
  • Case Class: Fields are mapped by location. Null values are not supported and type – safe access is supported.
  • Tuple: Maps fields by location, limited to 22 (Scala) or 25 (Java) fields, no null values, type-safe access.
  • Atomic Type: The table must have a single field, no null values, type-safe access.

Convert the Table to DataStream

The tables produced by streaming queries are dynamically updated, that is, changing as new records arrive in the input stream of the query. Therefore, the DataStream that converts this dynamic query into requires encoding the table updates.

There are two modes to convert a Table to DataStream:

1. Append Mode: This Mode can only be used if the dynamic table has been modified only by INSERT changes, that is, it is only appended and previously emitted results are never updated.

Retract Mode: you can always use this Mode. It encodes INSERT and DELETE changes using Boolean flags.

// get TableEnvironment. 
// registration of a DataSet is equivalent
val tableEnv: StreamTableEnvironment=...// see "Create a TableEnvironment" section

// Table with two fields (String name, Integer age)
val table: Table=...// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)

// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String.Int)] dsTuple = 
  tableEnv.toAppendStream[(String.Int)](table)

// convert the Table into a retract DataStream of Row.
// A retract stream of type X is a DataStream[(Boolean, X)].
// The boolean field indicates the type of the change.
// True is INSERT, false is DELETE.
val retractStream: DataStream[(Boolean.Row)] = tableEnv.toRetractStream[Row](table)
Copy the code

Note: For a detailed discussion of Dynamic Tables and their properties, see the “Dynamic Tables” documentation.

Convert a Table to a DataSet

Transform Tabble to DataSet as follows:

// get TableEnvironment 
// registration of a DataSet is equivalent
val tableEnv = BatchTableEnvironment.create(env)

// Table with two fields (String name, Integer age)
val table: Table=...// convert the Table into a DataSet of Row
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)

// convert the Table into a DataSet of Tuple2[String, Int]
val dsTuple: DataSet[(String.Int)] = tableEnv.toDataSet[(String.Int)](table)
Copy the code

Mapping data types to tables

Flink’s DataStream and DataSet apis support multiple types. Composite types such as Tuples (built-in Scala and Flink Java Tuples), POjos, Scala case classes, and Flink’s Row types allow nested data structures with multiple fields that can be accessed in Table expressions. Other types are considered atomic types. Below, we describe how the Table API converts these types to an internal row representation and show an example of converting DataStream to a Table.

Mapping of data types to Table schemas can occur in two ways: based on field location or based on field name.

Location-based mapping

Location-based mapping can be used to give fields more meaningful names while preserving their order. This mapping can be used for composite data types with a defined order of fields as well as atomic types. Complex data types such as tuples, rows, and case classes have this order of fields. However, the fields of the POJO must be mapped according to the field name. Fields can be projected, but cannot be renamed using the alias as.

When defining a location-based mapping, the specified name must not exist in the input data type, otherwise the API assumes that the mapping should be based on the field name. If no field name is specified, the default field name and field order are used for the compound type, or F0 is used for the atomic type.

// get a TableEnvironment
val tableEnv: StreamTableEnvironment=...// see "Create a TableEnvironment" section

val stream: DataStream[(Long.Int)] =...// convert DataStream into Table with default field names "_1" and "_2"
val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with field "myLong" only
val table: Table = tableEnv.fromDataStream(stream, 'myLong)

// convert DataStream into Table with field names "myLong" and "myInt"
val table: Table = tableEnv.fromDataStream(stream, 'myLong.'myInt)
Copy the code

Name-based mapping

Name-based mappings can be used for any data type, including POJOs. This is the most flexible way to define table schema mappings. All fields in the map are referenced by name and can be renamed using the alias as. Fields can be reordered and projected.

If no field name is specified, the default field name and field order are used for the compound type, or F0 is used for the atomic type.

// get a TableEnvironment
val tableEnv: StreamTableEnvironment=...// see "Create a TableEnvironment" section

val stream: DataStream[(Long.Int)] =...// convert DataStream into Table with default field names "_1" and "_2"
val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with field "_2" only
val table: Table = tableEnv.fromDataStream(stream, '_2)

// convert DataStream into Table with swapped fields
val table: Table = tableEnv.fromDataStream(stream, '_2.'_1)

// convert DataStream into Table with swapped fields and field names "myInt" and "myLong"
val table: Table = tableEnv.fromDataStream(stream, '_2 as 'myInt.'_1 as 'myLong)
Copy the code

Atom type

Flink treats primibles (Integer, Double, String) or generic types (types that cannot be analyzed or decomposed) as atomic types. The DataStream or DataSet of the atomic type is converted to a table with a single attribute. The type of the attribute is inferred from the atomic type, and the name of the attribute can be specified.

// get a TableEnvironment
val tableEnv: StreamTableEnvironment=...// see "Create a TableEnvironment" section

val stream: DataStream[Long] =...// convert DataStream into Table with default field name "f0"
val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with field name "myLong"
val table: Table = tableEnv.fromDataStream(stream, 'myLong)
Copy the code

Tuples (Scala and Java) and Case Classes (Scala only)

Flink supports Scala’s built-in tuples and provides its own tuple class for Java. DataStreams and datasets of both tuples can be converted to tables. Fields can be renamed (mapped by location) by providing the names of all the fields. If no field name is specified, the default field name is used. If the original field name is referenced (Flink tuple f0, f1,… , the Scala tuple is _1, _2,…) , the API assumes that the mapping is name-based rather than location-based. Name-based mapping allows fields and projections to be reordered using aliases (as).

// get a TableEnvironment
val tableEnv: StreamTableEnvironment=...// see "Create a TableEnvironment" section

val stream: DataStream[(Long.String)] =...// convert DataStream into Table with renamed default field names '_1, '_2
val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with field names "myLong", "myString" (position-based)
val table: Table = tableEnv.fromDataStream(stream, 'myLong.'myString)

// convert DataStream into Table with reordered fields "_2", "_1" (name-based)
val table: Table = tableEnv.fromDataStream(stream, '_2.'_1)

// convert DataStream into Table with projected field "_2" (name-based)
val table: Table = tableEnv.fromDataStream(stream, '_2)

// convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based)
val table: Table = tableEnv.fromDataStream(stream, '_2 as 'myString.'_1 as 'myLong)

// define case class
case class Person(name: String, age: Int)
val streamCC: DataStream[Person] =...// convert DataStream into Table with default field names 'name, 'age
val table = tableEnv.fromDataStream(streamCC)

// convert DataStream into Table with field names 'myName, 'myAge (position-based)
val table = tableEnv.fromDataStream(streamCC, 'myName.'myAge)

// convert DataStream into Table with reordered and aliased fields "myAge", "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'age as 'myAge.'name as 'myName)
Copy the code

POJO (Java and Scala)

Flink supports POJOs as composite types. The rules for determining poJOs are recorded here.

When converting a POJO DataStream or DataSet to a Table without specifying a field name, the name of the original POJO field is used. The name mapping requires the original name and cannot be mapped by job title. Fields can be renamed, reordered, and projected using aliases (using the as keyword).

// get a TableEnvironment
val tableEnv: StreamTableEnvironment=...// see "Create a TableEnvironment" section

// Person is a POJO with field names "name" and "age"
val stream: DataStream[Person] =...// convert DataStream into Table with default field names "age", "name" (fields are ordered by name!)
val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'age as 'myAge.'name as 'myName)

// convert DataStream into Table with projected field "name" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name)

// convert DataStream into Table with projected and renamed field "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName)
Copy the code

Row

The Row data type supports any number of fields and fields with null values. You can specify the field name through RowTypeInfo or when converting a Row DataStream or DataSet to a Table. Row types support mapping fields by location and name. You can rename the fields by providing the names of all the fields (location-based mapping), or you can select the fields individually for projection/sort/rename (name-based mapping).

// get a TableEnvironment
val tableEnv: StreamTableEnvironment=...// see "Create a TableEnvironment" section

// DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
val stream: DataStream[Row] =...// convert DataStream into Table with default field names "name", "age"
val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with renamed field names "myName", "myAge" (position-based)
val table: Table = tableEnv.fromDataStream(stream, 'myName.'myAge)

// convert DataStream into Table with renamed fields "myName", "myAge" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName.'age as 'myAge)

// convert DataStream into Table with projected field "name" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name)

// convert DataStream into Table with projected and renamed field "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName)
Copy the code

Query optimization

Apache Flink uses Apache Calcite to optimize and translate queries. Optimizations currently performed include projection and filter pushdown, subquery de-correlation, and other types of query rewrite. The Old Planner has not yet optimized the order of joins, but instead executes them in the order defined in the query (table order in the FROM clause and/or join predicate order in the WHERE clause).

By providing CalciteConfig objects, you can adjust the set of optimization rules that are applied at different stages. Can be invoked by the builder CalciteConfig. CreateBuilder () to create this property, And by calling tableEnv. GetConfig. SetPlannerConfig (calciteConfig) to provide TableEnvironment it.

Explaining a Table

The Table API provides a mechanism to interpret the logic for calculating tables and optimize query plans. This is done through the tableEnvironment.explain (table) method or tableEnvironment.explain () method. Explain (table) Returns the plan for the given table. Explain () returns the results of a multi-receiver plan, primarily for use with Blink planners. It returns a string describing the three plans:

  1. The abstract syntax tree for relational queries, the unoptimized logical query plan,
  2. Optimized logical query plan,
  3. And the actual execution plan.

The following code shows an example and an execution plan using explain(table) for the corresponding output of a given table:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)

val table1 = env.fromElements((1."hello")).toTable(tEnv, 'count.'word)
val table2 = env.fromElements((1."hello")).toTable(tEnv, 'count.'word)
val table = table1
  .where('word.like("F%"))
  .unionAll(table2)

val explanation: String = tEnv.explain(table)
println(explanation)
Copy the code
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
  LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
    FlinkLogicalDataStreamScan(id=[1], fields=[count, word])
  FlinkLogicalDataStreamScan(id=[2], fields=[count, word])

== Optimized Logical Plan ==
DataStreamUnion(all=[true], union all=[count, word])
  DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
    DataStreamScan(id=[1], fields=[count, word])
  DataStreamScan(id=[2], fields=[count, word])

== Physical Execution Plan ==
Stage 1 : Data Source
	content : collect elements with CollectionInputFormat

Stage 2 : Data Source
	content : collect elements with CollectionInputFormat

	Stage 3 : Operator
		content : from: (count, word)
		ship_strategy : REBALANCE

		Stage 4 : Operator
			content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
			ship_strategy : FORWARD

			Stage 5 : Operator
				content : from: (count, word)
				ship_strategy : REBALANCE
Copy the code

The following code shows an example and an execution plan for the corresponding output with multiple sinks using explain() :

val settings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build
val tEnv = TableEnvironment.create(settings)

val fieldNames = Array("count"."word")
val fieldTypes = Array[TypeInformation[_]] (Types.INT.Types.STRING)
tEnv.registerTableSource("MySource1".new CsvTableSource("/source/path1", fieldNames, fieldTypes))
tEnv.registerTableSource("MySource2".new CsvTableSource("/source/path2",fieldNames, fieldTypes))
tEnv.registerTableSink("MySink1".new CsvTableSink("/sink/path1").configure(fieldNames, fieldTypes))
tEnv.registerTableSink("MySink2".new CsvTableSink("/sink/path2").configure(fieldNames, fieldTypes))

val table1 = tEnv.scan("MySource1").where("LIKE(word, 'F%')")
table1.insertInto("MySink1")

val table2 = table1.unionAll(tEnv.scan("MySource2"))
table2.insertInto("MySink2")

val explanation = tEnv.explain(false)
println(explanation)
Copy the code

The result of the multi-exchange plan is

== Abstract Syntax Tree ==
LogicalSink(name=[MySink1], fields=[count, word])
+- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
   +- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])

LogicalSink(name=[MySink2], fields=[count, word])
+- LogicalUnion(all=[true])
   :- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
   :  +- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])
   +- LogicalTableScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]])

== Optimized Logical Plan ==
Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')], reuse_id=[1])
+- TableSourceScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])

Sink(name=[MySink1], fields=[count, word])
+- Reused(reference_id=[1])

Sink(name=[MySink2], fields=[count, word])
+- Union(all=[true], union=[count, word])
   :- Reused(reference_id=[1])
   +- TableSourceScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])

== Physical Execution Plan ==
Stage 1 : Data Source
	content : collect elements with CollectionInputFormat

	Stage 2 : Operator
		content : CsvTableSource(read fields: count, word)
		ship_strategy : REBALANCE

		Stage 3 : Operator
			content : SourceConversion(table:Buffer(default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]), fields:(count, word))
			ship_strategy : FORWARD

			Stage 4 : Operator
				content : Calc(where: (word LIKE _UTF-16LE'F%'), select: (count, word))
				ship_strategy : FORWARD

				Stage 5 : Operator
					content : SinkConversionToRow
					ship_strategy : FORWARD

					Stage 6 : Operator
						content : Map
						ship_strategy : FORWARD

Stage 8 : Data Source
	content : collect elements with CollectionInputFormat

	Stage 9 : Operator
		content : CsvTableSource(read fields: count, word)
		ship_strategy : REBALANCE

		Stage 10 : Operator
			content : SourceConversion(table:Buffer(default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]), fields:(count, word))
			ship_strategy : FORWARD

			Stage 12 : Operator
				content : SinkConversionToRow
				ship_strategy : FORWARD

				Stage 13 : Operator
					content : Map
					ship_strategy : FORWARD

					Stage 7 : Data Sink
						content : Sink: CsvTableSink(count, word)
						ship_strategy : FORWARD

						Stage 14 : Data Sink
							content : Sink: CsvTableSink(count, word)
							ship_strategy : FORWARD
Copy the code