The article is mainly to Flink related contents in the website translation, the original address: ci.apache.org/projects/fl…

The Table API is a unified relational API for stream and batch processing. Table API queries can be run on batch or stream inputs without modification. The Table API is a superset of the SQL language, designed specifically for use with Apache Flink. The Table API is an API for language integration between Scala and Java. Instead of specifying a query to a String value, as is common in SQL, Table API queries are defined in a langue-embedded style in Java or Scala, with IDE support such as auto-completion and syntax validation.

The Table API shares many of the concepts and parts of its API with Flink’s SQL integration. Take a look at the Common Concepts&API to see how to register a Table or create a Table object. The “Streaming Concepts” page discusses flow-specific Concepts such as dynamic tables and temporal properties. The following example assumes a registry called Orders with properties (A, B, C, rowtime). The RowTime field is either a logical time attribute in a flow or a regular timestamp field in a batch.

Overview and Examples

The Table API is available for Scala and Java. The Scala Table API makes use of Scala expressions, and the Java Table API is based on strings that are parsed and converted to equivalent expressions.

The following example shows the difference between Scala and the Java Table API. The Table program executes in a batch environment. It will scan the “Orders” table, group by field A, and calculate the result rows for each group. The result of the Table program is converted to a DataSet of type Row and printed.

Java

By importing the org. Apache. Flink. Table. API. Java. _ enabling Java table API. The following example shows how to construct a Java Table API program and how to specify an expression as a string.

import org.apache.flink.table.api._
import org.apache.flink.table.api.java._

// environment configuration
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);

// register Orders table in table environment
// ...

// specify table program
Table orders = tEnv.from("Orders"); // schema (a, b, c, rowtime)

Table counts = orders
        .groupBy("a")
        .select("a, b.count as cnt");

// conversion to DataSet
DataSet<Row> result = tEnv.toDataSet(counts, Row.class);
result.print();
Copy the code

Scala

By importing the org. Apache. Flink. API. Scala. _ and org., apache. Flink. Table. API. Scala. _ to enable scala table API. The following example shows how the Scala Table API program is constructed. The Table attribute is referenced using the Scala symbol, which begins with an apostrophe (‘).

import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.scala._

// environment configuration
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = BatchTableEnvironment.create(env)

// register Orders table in table environment
// ...

// specify table program
val orders = tEnv.from("Orders") // schema (a, b, c, rowtime)

val result = orders
               .groupBy('a)
               .select('a.'b.count as 'cnt)
               .toDataSet[Row] // conversion to DataSet
               .print()
Copy the code

The next example shows a more complex Table API program. The program scans the “Orders” table again. It filters null values, normalizes field A of String type, and calculates and generates an average bill amount b for a per hour.

Java

// environment configuration
// ...

// specify table program
Table orders = tEnv.from("Orders"); // schema (a, b, c, rowtime)

Table result = orders
        .filter("a.isNotNull && b.isNotNull && c.isNotNull")
        .select("a.lowerCase() as a, b, rowtime")
        .window(Tumble.over("1.hour").on("rowtime").as("hourlyWindow"))
        .groupBy("hourlyWindow, a")
        .select("a, hourlyWindow.end as hour, b.avg as avgBillingAmount");
Copy the code

Scala

// environment configuration
// ...

// specify table program
val orders: Table = tEnv.from("Orders") // schema (a, b, c, rowtime)

val result: Table = orders
        .filter('a.isNotNull && 'b.isNotNull && 'c.isNotNull)
        .select('a.lowerCase() as 'a.'b.'rowtime)
        .window(Tumble over 1.hour on 'rowtime as 'hourlyWindow)
        .groupBy('hourlyWindow.'a)
        .select('a.'hourlyWindow.end as 'hour.'b.avg as 'avgBillingAmount)
Copy the code

Because the Table API is a unified API for batch and stream data, both sample programs can be executed on batch and stream inputs without any modifications to the Table program itself. In both cases, the program produces the same result as long as the stream is not logged late.

operator

The Table API supports the following operators. Note that not all operations are available for batch and streaming; They are marked accordingly.

Scan, Projection, and Filter

Operators Description
From

Batch

Streaming
Similar to the FROM clause in an SQL query. Perform a scan of the registry.

val orders: Table = tableEnv.from("Orders")
Select

Batch

Streaming
Similar to the SQL SELECT statement. Perform the selection operation.

val orders: Table = tableEnv.from("Orders")

val result = orders.select('a, 'c as 'd)

You can use the asterisk (*) as a wildcard to select all the columns in the table.

val orders: Table = tableEnv.from("Orders")

val result = orders.select('*)
As

Batch

Streaming
Rename the field.

val orders: Table = tableEnv.from("Orders").as('x, 'y, 'z, 't)
Where / Filter

Batch

Streaming
Similar to the SQL WHERE clause. Filters out rows that do not pass the filter predicate.

val orders: Table = tableEnv.from("Orders")

val result = orders.filter('a % 2 === 0)

orval orders: Table = tableEnv.from("Orders")

val result = orders.where('b === "red")

Column Operations

Operators Description
AddColumns

Batch

Streaming
The field is added. If the added field already exists, it throws an exception.

val orders = tableEnv.from("Orders");

val result = orders.addColumns(concat('c, "Sunny"))
AddOrReplaceColumns

Batch

Streaming
The field is added. If you add a column with the same name as an existing column, the existing field will be replaced. Also, if the added fields have duplicate field names, the last one is used.

val orders = tableEnv.from("Orders");

val result = orders.addOrReplaceColumns(concat('c, "Sunny") as 'desc)
DropColumns

Batch

Streaming
The field is deleted. Field expressions should be field reference expressions, and only existing fields can be deleted.

val orders = tableEnv.from("Orders");

val result = orders.dropColumns('b, 'c)
RenameColumns

Batch

Streaming
Perform a field rename operation. Field expressions should be alias expressions, and only existing fields can be renamed.

val orders = tableEnv.from("Orders");

val result = orders.renameColumns('b as 'b2, 'c as 'c2)

Aggregations

Operators Description
GroupBy polymerization

Batch

Streaming

Result Updating
Similar to the SQL GROUP BY clause. Use the following running aggregation operator to group the rows on the grouping key to aggregate the rows line by line.

val orders: Table = tableEnv.scan("Orders")

val result = orders.groupBy('a).select('a, 'b.sum as 'd)

Note: For streaming queries, the amount of state required to calculate the query results can grow indefinitely, depending on the type of aggregation and the number of different grouping keys. Provide a query configuration with a valid retention interval to prevent the state from becoming too large. Please refer to theQuery configuration
GroupBy Window aggregation

Batch

Streaming
Group and aggregate tables on one or more possible group keys in the groups window.

val orders: Table = tableEnv.scan("Orders")

val result: Table = orders

.window(Tumble over 5.minutes on 'rowtime as 'w) // define window

.groupBy('a, 'w) // group by key and window

.select('a, w.start, 'w.end, 'w.rowtime, 'b.sum as 'd) // access window properties and aggregate
Over window aggregation

Batch

Streaming
Similar to the SQL OVER clause. Calculates window aggregation for each row based on the window (range) of the preceding and following rows.

val orders: Table = tableEnv.from("Orders")

val result: Table = orders // define window

.window(Over

partitionBy 'a

orderBy 'rowtime

preceding UNBOUNDED_RANGE

following CURRENT_RANGE

as 'w)

.select('a, 'b.avg over 'w, 'b.max over 'w, 'b.min over 'w) // sliding aggregate

**Note:** All aggregations must be defined on the same window (that is, the same partition, sort, and scope). Currently, only the PRECEDING (unbounded and bounded) window into the CURRENT ROW scope is supported. Ranges with FOLLOWING are not supported at this time. Must be in a singleTime propertySelect ORDER BY.
Distinct polymerization

Batch

Streaming

Result Updating
Similar to the SQL DISTINCT AGGREGATION clause, for example COUNT (DISTINCT a). Different aggregate declarations Aggregate functions (built-in or user-defined) apply only to different input values. Distinct aggregation can be used for GroupBy aggregation, GroupBy window aggregation, and Over window aggregation.

val orders: Table = tableEnv.from("Orders");



// Distinct aggregation on group by

val groupByDistinctResult = orders

.groupBy('a)

.select('a, 'b.sum.distinct as 'd)



// Distinct aggregation on time window group by

val groupByWindowDistinctResult = orders

.window(Tumble over 5.minutes on 'rowtime as 'w).groupBy('a, 'w)

.select('a, 'b.sum.distinct as 'd)



// Distinct aggregation on over window

val result = orders

.window(Over

partitionBy 'a

orderBy 'rowtime

preceding UNBOUNDED_RANGE

as 'w)

.select('a, 'b.avg.distinct over 'w, 'b.max over 'w, 'b.min over 'w)

User-defined aggregate functions can also be used with the DISTINCT modifier. To calculate the aggregation results only for different values, simply add the DISTINCT modifier to the aggregation function.

val orders: Table = tEnv.from("Orders");



// Use distinct aggregation for user-defined aggregate functions

val myUdagg = new MyUdagg();

orders.groupBy('users).select('users, myUdagg.distinct('points) as 'myDistinctResult);

**Note:** For streaming queries, the state required to calculate the query result may grow indefinitely, depending on the type of aggregation and the number of different grouping keys. Provide a query configuration with a valid retention interval to prevent the state from becoming too large. Please refer to theQuery configuration
Distinct

Batch

Streaming

Result Updating
Similar to the SQL DISTINCT clause. Returns records with different combinations of values.

val orders: Table = tableEnv.from("Orders")

val result = orders.distinct()

**Note:** For flow queries, depending on the number of query fields, the state required to calculate the query result may grow indefinitely. Provide query configurations with valid retention intervals to prevent excessive states. If status clearance is enabled, DISTINCT must issue messages to prevent downstream operators from prematurely exiting status, which can result in DISTINCT containing results updates. For more information, seeQuery configuration

Joins

Operators Description
Inner Join

Batch

Streaming
Similar to the SQL JOIN clause. Join the two tables. The two tables must have different field names, and at least one equal join predicate must be defined by the join operator or using the WHERE or filter operators.

val left = ds1.toTable(tableEnv, 'a, 'b, 'c)

val right = ds2.toTable(tableEnv, 'd, 'e, 'f)

val result = left.join(right).where('a === 'd).select('a, 'b, 'e)

Note:For flow queries, the amount of state required to calculate the query result can grow indefinitely, depending on the number of different input rows. Provide query configurations with valid retention intervals to prevent excessive states. For more information, seeQuery configuration
Outer Join

Batch

Streaming

Result Updating
Similar to SQL LEFT/RIGHT/FULL OUTER JOIN clauses. Join the two tables. The two tables must have different field names, and at least one equality join predicate must be defined.

val left = tableEnv.fromDataSet(ds1, 'a, 'b, 'c)

val right = tableEnv.fromDataSet(ds2, 'd, 'e, 'f)



val leftOuterResult = left.leftOuterJoin(right, 'a === 'd).select('a, 'b, 'e)

val rightOuterResult = left.rightOuterJoin(right, 'a === 'd).select('a, 'b, 'e)

val fullOuterResult = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e)

Note:For flow queries, the amount of state required to calculate the query result can grow indefinitely, depending on the number of different input rows. Provide query configurations with valid retention intervals to prevent excessive states. For more information, seeQuery configuration
Time-windowed Join

Batch

Streaming
Note:Time window connections are a subset of general connections that can be processed as streams.

Time window connections require at least one equal connection and a connection condition that limits the time of both parties. This condition can be defined with two appropriate range predicates (<, <=, > =, >) or a single equality predicate that compares two input tables for the same type of time attribute (that is, processing time or event time).

For example, the following are valid window connection conditions:

'ltime === 'rtime

'ltime >= 'rtime && 'ltime < 'rtime + 10.minutes



val left = ds1.toTable(tableEnv, 'a, 'b, 'c, 'ltime.rowtime)

val right = ds2.toTable(tableEnv, 'd, 'e, 'f, 'rtime.rowtime)



val result = left.join(right)

.where('a === 'd && 'ltime >= 'rtime - 5.minutes && 'ltime < 'rtime + 10.minutes)

.select('a, 'b, 'e, 'ltime)
Inner Join with Table Function (UDTF)

Batch

Streaming
Join the table using the result of the table function. Each row of the left (outer) table is joined to all rows resulting from the corresponding call to the table function. If its table function call returns an empty result, the row in the left (outer) table is deleted.

// instantiate User-Defined Table Function

val split: TableFunction[_] = new MySplitUDTF()



// join

val result: Table = table

.joinLateral(split('c) as ('s, 't, 'v))

.select('a, 'b, 's, 't, 'v)
Left Outer Join with Table Function (UDTF)

Batch

Streaming
Join tables using the results of table functions. Each row of the left (outer) table is joined to all rows resulting from the corresponding call to the table function. If the table function call returns an empty result, the corresponding external row is retained and the result is populated with null values.

**Note:** Currently, the left outer join of table functions can only be null or true.

// instantiate User-Defined Table Function

val split: TableFunction[_] = new MySplitUDTF()

// join val result: Table = table

.leftOuterJoinLateral(split('c) as ('s, 't, 'v))

.select('a, 'b, 's, 't, 'v)
Join with Temporal Table

Streaming
Temporal tablesIs a table that tracks its change over time.

The temporal table feature provides access to the state of a temporal table at a particular point in time. The syntax for joining tables using temporal table functions is the same as for inner joins using table functions.

**Note:** Currently only internal joins using temporary tables are supported.

val ratesHistory = tableEnv.from("RatesHistory")



// register temporal table function with a time attribute and primary key

val rates = ratesHistory.createTemporalTableFunction('r_proctime, 'r_currency)

// join with "Orders" based on the time attribute and key

val orders = tableEnv.from("Orders")

val result = orders

.joinLateral(rates('o_rowtime), 'r_currency === 'o_currency)

A collection of operator

Operators Description
Union

Batch
Similar to the SQL UNION clause. Merge two tables where duplicate records have been deleted. Both tables must have the same field type.

val left = ds1.toTable(tableEnv, 'a, 'b, 'c)

val right = ds2.toTable(tableEnv, 'a, 'b, 'c)

val result = left.union(right)
UnionAll

Batch

Streaming
Similar to the SQL UNION ALL clause. Merge two tables, both tables must have the same field type.val left = ds1.toTable(tableEnv, 'a, 'b, 'c)

val right = ds2.toTable(tableEnv, 'a, 'b, 'c)

val result = left.unionAll(right)
Intersect

Batch
Similar to the SQL INTERSECT clause. Intersection returns records that exist in both tables. If a record occurs multiple times in one or two tables, it is returned only once, that is, there are no duplicate records in the resulting table. Both tables must have the same field types.

val left = ds1.toTable(tableEnv, 'a, 'b, 'c)

val right = ds2.toTable(tableEnv, 'e, 'f, 'g)

val result = left.intersect(right)
IntersectAll

Batch
Similar to the SQL INTERSECT ALL clause. IntersectAll returns records that exist in both tables. If a record exists more than once in both tables, the number of times it is returned is the same as the number of times it exists in both tables, that is, the resulting table may have duplicate records. Both tables must have the same field types.

val left = ds1.toTable(tableEnv, 'a, 'b, 'c)

val right = ds2.toTable(tableEnv, 'e, 'f, 'g)

val result = left.intersectAll(right)
Minus

Batch
Similar to the SQL EXCEPT clause. Minus Returns records that do not exist in the right table in the left table. The duplicate entries in the left table are returned only once, that is, the duplicate entries are deleted. Both tables must have the same field types.

val left = ds1.toTable(tableEnv, 'a, 'b, 'c)

val right = ds2.toTable(tableEnv, 'a, 'b, 'c)

val result = left.minus(right)
MinusAll

Batch
Similar to the SQL EXCEPT ALL clause. MinusAll returns records that do not exist in the right table. A record that occurs n times in the left table and m times in the right table returns (n-m) times, that is, the number of duplications that occur in the right table is deleted. Both tables must have the same field types.

val left = ds1.toTable(tableEnv, 'a, 'b, 'c)

val right = ds2.toTable(tableEnv, 'a, 'b, 'c)

val result = left.minusAll(right)
In

Batch

Streaming
Similar to the SQL IN clause. Returns true if the expression exists in the given table subquery. The subquery table must contain one column. This column must have the same data type as the expression.

val left = ds1.toTable(tableEnv, 'a, 'b, 'c)

val right = ds2.toTable(tableEnv, 'a)

val result = left.select('a, 'b, 'c).where('a.in(right))

Note:For flow queries, this operation will be rewritten as a Join and group operation. Depending on the number of input rows, the state required to calculate the query result can grow indefinitely. Provide query configurations with valid retention intervals to prevent excessive states. For more information, seeQuery configuration

OrderBy, Offset & Fetch

Operators Description
Order By

Batch
Similar to the SQL ORDER BY clause. Returns the globally sorted records on all parallel partitions.

val in = ds.toTable(tableEnv, 'a, 'b, 'c)

val result = in.orderBy('a.asc)
Offset & Fetch

Batch
Similar to the SQL OFFSET and FETCH clauses. Offset and extract limit the number of records that can be returned from the sort result. Offset and extract are technically part of the Order By operator and must therefore precede them.

val in = ds.toTable(tableEnv, 'a, 'b, 'c)



// returns the first 5 records from the sorted result

val result1: Table = in.orderBy('a.asc).fetch(5)



// skips the first 3 records and returns all following records from the sorted result

val result2: Table = in.orderBy('a.asc).offset(3)



// skips the first 10 records and returns the next 5 records from the sorted result

val result3: Table = in.orderBy('a.asc).offset(10).fetch(5)

Insert

Operators Description
Insert Into

Batch

Streaming
Similar to the INSERT INTO clause in an SQL query. Performs the insert in the inserted output table.

The output tables must be registered in the TableEnvironment. In addition, the schema of the registry must match the schema of the query.

val orders: Table = tableEnv.from("Orders")

orders.insertInto("OutOrders")

Group Windows

The groups window aggregates row groups into finite groups by time or row-count interval and aggregates functions by group. For batch tables, Windows are a convenient shortcut to group records by time interval.

Windows is defined using the window (w: GroupWindow) clause and requires the alias specified by the as clause. In order to group tables by window, you must use the groupBy (…) Clause refers to a window alias.

The following example shows how to define window aggregations on a table.

val table = input
  .window([w: GroupWindow] as 'w)  // define window with alias w
  .groupBy('w)   // group the table by window w
  .select('b.sum)  // aggregate
Copy the code

In a streaming environment, if window aggregations are grouped on one or more properties in addition to Windows, they can only be computed in parallel. GroupBy (…) Clause refers to the window alias and at least one other property. GroupBy (…) that only references the window alias Clauses, such as the one in the above example, can only be evaluated by a single non-parallel task. The following example shows how to define window aggregations using additional grouping properties.

val table = input
  .window([w: GroupWindow] as 'w) // define window with alias w
  .groupBy('w.'a)  // group the table by attribute a and window w 
  .select('a.'b.sum)  // aggregate
Copy the code

You can add window properties (such as the start, end, or line timestamp of a time window) to the properties of the window alias, w.start, W.End, and W.rawtime, respectively, in the SELECT statement. Window start and line time timestamps are the upper and lower boundaries of the containing window. In contrast, the window end timestamp is the only window upper boundary. For example, a 30-minute scrolling window starting at 2:00 p.m. would have a start timestamp of 14:00:00.000, a line time timestamp of 14:29:59.999, and an end time stamp of 14:30:00.

val table = input
  .window([w: GroupWindow] as 'w)  // define window with alias w
  .groupBy('w.'a)  // group the table by attribute a and window w 
  .select('a.'w.start, 'w.end, 'w.rowtime, 'b.count) // aggregate and add window start, end, and rowtime timestamps
Copy the code

The Window parameter defines how rows are mapped to Windows. A window is not an interface that the user can implement. Instead, the Table API provides a set of predefined Window classes with specific semantics that are transformed into the underlying DataStream or DataSet operations. The supported window definitions are listed below.

Tumble window

Scrolling Windows assign rows to fixed – length contiguous Windows that do not overlap.

For example, a 5-minute tumble window groups rows at 5-minute intervals. The tumble window can be defined at event time, processing time, or line count. Use the Tumble class to define the Tumble window as follows:

The scroll window is defined using the Tumble class, as shown below:

Method Description
over Defines the length of the window, either in time interval or in line interval.
on The time attribute is group (time interval) or sort (row count). For batch queries, this could be any Long or Timestamp attribute. For streaming queries, this must be the declared event time or processing time attribute.
as Specifies an alias for the window. Aliases are used to refer to Windows in the following groupBy () clause and optionally select window properties such as Window start, end or RowTime timestamp in the SELECT () clause.
/ Tumbling Event-time Window
.window(Tumble over 10.minutes on 'rowtime as 'w)

// Tumbling Processing-time Window (assuming a processing-time attribute "proctime")
.window(Tumble over 10.minutes on 'proctime as 'w)

// Tumbling Row-count Window (assuming a processing-time attribute "proctime")
.window(Tumble over 10.rows on 'proctime as 'w)
Copy the code

Slide window

The sliding window is of a fixed size and slides at the specified sliding interval. If the slide interval is smaller than the window size, the slide window overlaps. Therefore, rows can be assigned to multiple Windows.

For example, a sliding window with a 15-minute size and a 5-minute sliding interval allocates each row to three different 15-minute size Windows that are called at 5-minute intervals. Sliding Windows can be defined at event time, processing time, or line count.

The Slide window is defined using the Slide class, as shown below:

Method Description
over Defines the length of the window, which can be time or row count interval.
every Define the slide interval, which can be either the time interval or the number of rows. The slide interval must be of the same type as the size interval.
on The time attribute is group (time interval) or sort (row count). For batch queries, this could be any Long or Timestamp attribute. For streaming queries, this must beDeclared event time or processing time attribute.
as Specifies an alias for the window. Aliases are used to refer to Windows in the following groupBy () clause and optionally select window properties such as Window start, end or RowTime timestamp in the SELECT () clause.
// Sliding Event-time Window
.window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w)

// Sliding Processing-time window (assuming a processing-time attribute "proctime")
.window(Slide over 10.minutes every 5.minutes on 'proctime as 'w)

// Sliding Row-count window (assuming a processing-time attribute "proctime")
.window(Slide over 10.rows every 5.rows on 'proctime as 'w)
Copy the code

Session (Session window)

The session window does not have a fixed size, but its boundary is defined by the inactivity interval, that is, if no event occurs within the defined interval, the session window closes.

For example, a session window that is 30 minutes apart starts when a row is observed after 30 minutes of inactivity (otherwise the row will be added to an existing window), and closes if no row is added within 30 minutes. The session window can work at event time or processing time.

The Session window is defined using the Session class, as shown below:

Method Description
withGap Define the interval between two Windows as the time interval.
on The time attribute is group (time interval) or sort (row count). For batch queries, this could be any Long or Timestamp attribute. For streaming queries, this must be the declared event time or processing time attribute.
as Specifies an alias for the window. Aliases are used to refer to Windows in the following groupBy () clause and optionally select window properties such as Window start, end or RowTime timestamp in the SELECT () clause.
// Session Event-time Window
.window(Session withGap 10.minutes on 'rowtime as 'w)

// Session Processing-time Window (assuming a processing-time attribute "proctime")
.window(Session withGap 10.minutes on 'proctime as 'w)
Copy the code

Over Windows

Window aggregation is known by the standard SQL (OVER clause) and is defined in the SELECT clause of the query. Unlike the GROUP window specified in the GROUP BY clause, the row does not collapse above the window. Instead, in the window aggregation, the aggregation is computed for each input line in the range of its adjacent rows.

The window is defined using the window (w: OverWindow) clause (using over_window (OverWindow) in the Python API) and referred to by an alias in the select() method. The following example shows how to define window aggregations on a table.

val table = input
  .window([w: OverWindow] as 'w)              // define over window with alias w
  .select('a.'b.sum over 'w.'c.min over 'w) // aggregate over the over window w
Copy the code

OverWindow defines the range of rows to compute the aggregation. OverWindow is not an interface that the user can implement. Instead, the Table API provides an Over class to configure the properties of the Over window. You can define the top of the window at the event time or processing time and at a range specified for time intervals or row counts. Supported over window definitions are exposed as methods on over (and other classes) and are listed below:

Method Required Description
partitionBy Optional Defines the partition of the input on one or more properties. Each partition is sorted individually, and aggregate functions are applied to each partition separately.

Note:In a streaming environment, if the window contains a Partition by clause, the entire window aggregation can only be computed in parallel. No partitionBy (…) , the flow will be handled by a single non-parallel task.
orderBy Required Define the order of rows within each partition, thereby defining the order in which aggregate functions are applied to rows.

Note:For a flow query, it must be the declared event time or processing time time attribute. Currently, only a single sort attribute is supported.
preceding Optional Defines the interval of rows to be included in the window before the current row. This interval can be specified as a time interval or a row count interval.

Specify the bounds on the window with the size of the time interval, for example, the time interval is 10 minutes and the row count interval is 10 lines.

Use constants to specify an unbound on the window, that is, UNBOUNDED_RANGE for time intervals or UNBOUNDED_ROW for row count intervals. Borderless on Windows starts on the first line of the partition.

If the preceding clause is omitted, UNBOUNDED_RANGE and CURRENT_RANGE are used as the default before and after the window.
following Optional Defines the window interval between the rows in the window that contain and immediately follow the current row. This interval must be specified in the same units as the previous interval (time or line count). Currently, Windows with rows after the current row are not supported. Instead, you can specify one of two constants:

1. CURRENT_ROW sets the upper limit of the window to the current row.

2. CURRENT_RANGE sets the upper limit of the window to the sort key of the current row, that is, all rows that have the same sort key as the current row are included in the window.

If the subsentence is omitted, the upper limit of the interval window is defined as CURRENT_RANGE, and the upper limit of the row count interval window is defined as CURRENT_ROW.
as Required Assigns an alias to the upper window. Aliases are used to refer to the over window in the following select () clause.

Note: Currently, all aggregate functions in the same select () call must be evaluated within the same window scope.

Unbounded Over Windows

// Unbounded Event-time over window (assuming an event-time attribute "rowtime")
.window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)

// Unbounded Processing-time over window (assuming a processing-time attribute "proctime")
.window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w)

// Unbounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
.window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w)
 
// Unbounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
.window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
Copy the code

Bounded Over Windows

// Bounded Event-time over window (assuming an event-time attribute "rowtime")
.window(Over partitionBy 'a orderBy 'rowtime preceding 1.minutes as 'w)

// Bounded Processing-time over window (assuming a processing-time attribute "proctime")
.window(Over partitionBy 'a orderBy 'proctime preceding 1.minutes as 'w)

// Bounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
.window(Over partitionBy 'a orderBy 'rowtime preceding 10.rows as 'w)
  
// Bounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
.window(Over partitionBy 'a orderBy 'proctime preceding 10.rows as 'w)
Copy the code

Row-based operations

Row-based operations generate output with multiple columns.

  • Map Batch Streaming

    Perform mapping operations using user-defined scalar functions or built-in scalar functions. If the output type is compound, the output will be flattened.

    class MyMapFunction extends ScalarFunction {
      def eval(a: String) :Row = {
        Row.of(a, "pre-" + a)
      }
    
      override def getResultType(signature: Array[Class[_]]) :TypeInformation[_] =
        Types.ROW(Types.STRING.Types.STRING)}val func = new MyMapFunction(a)val table = input
      .map(func('c)).as('a.'b)
    Copy the code
  • FlatMap Batch Streaming

    Perform the flatMap operation using the table function.

    class MyFlatMapFunction extends TableFunction[Row] {
      def eval(str: String) :Unit = {
        if (str.contains("#")) {
          str.split("#").foreach({ s =>
            val row = new Row(2)
            row.setField(0, s)
            row.setField(1, s.length)
            collect(row)
          })
        }
      }
    
      override def getResultType: TypeInformation[Row] = {
        Types.ROW(Types.STRING.Types.INT)}}val func = new MyFlatMapFunction
    val table = input
      .flatMap(func('c)).as('a.'b)
    Copy the code
  • Aggregate Batch Streaming Result Updating

    Perform an aggregate operation using an aggregate function. You must turn off aggregation with a SELECT statement, and the select statement does not support aggregation functions. If the output type is compound, the aggregated output will be flattened.

    case class MyMinMaxAcc(var min: Int, var max: Int)
    
    class MyMinMax extends AggregateFunction[Row.MyMinMaxAcc] {
    
      def accumulate(acc: MyMinMaxAcc, value: Int) :Unit = {
        if (value < acc.min) {
          acc.min = value
        }
        if (value > acc.max) {
          acc.max = value
        }
      }
    
      override def createAccumulator() :MyMinMaxAcc = MyMinMaxAcc(0.0)
      
      def resetAccumulator(acc: MyMinMaxAcc) :Unit = {
        acc.min = 0
        acc.max = 0
      }
    
      override def getValue(acc: MyMinMaxAcc) :Row = {
        Row.of(Integer.valueOf(acc.min), Integer.valueOf(acc.max))
      }
    
      override def getResultType: TypeInformation[Row] = {
        new RowTypeInfo(Types.INT.Types.INT)}}val myAggFunc = new MyMinMax
    val table = input
      .groupBy('key)
      .aggregate(myAggFunc('a) as ('x.'y))
      .select('key.'x.'y)
    Copy the code
  • Group Window Aggregate Batch Streaming

    Group and aggregate tables in the group window and possibly on one or more grouping keys. You must turn off aggregation with a SELECT statement. And the SELECT statement does not support ‘*’ or aggregate functions.

    val myAggFunc = new MyMinMax
    val table = input
        .window(Tumble over 5.minutes on 'rowtime as 'w) // define window
        .groupBy('key.'w) // group by key and window
        .aggregate(myAggFunc('a) as ('x.'y))
        .select('key.'x.'y.'w.start, 'w.end) // access window properties and aggregate results
    Copy the code
  • FlatAggregate Batch Streaming

    Similar to GroupBy aggregation. Use the following run table aggregation operator to group the rows on the grouping key to aggregate the rows row by row. The difference with an AggregateFunction is that a TableAggregateFunction can return zero or more records for a group. You must turn off “flatAggregate” with a SELECT statement. And the select statement does not support aggregate functions. In addition to using emitValue to print results, you can also use the emitUpdateWithRetract method. Unlike emitValue, emitUpdateWithRetract is used to issue updated values. This method outputs data incrementally in undo mode, that is, once there is an update, we must undo the old record and then send the new update record. If either method is defined in the table aggregation function, the emitUpdateWithRetract method is preferred because it is more efficient than emitValue because it increments the output value.

    import java.lang.{Integer= >JInteger}
    import org.apache.flink.table.api.Types
    import org.apache.flink.table.functions.TableAggregateFunction
    
    /** * Accumulator for top2. */
    class Top2Accum {
      var first: JInteger = _
      var second: JInteger= _}/** * The top2 user-defined table aggregate function. */
    class Top2 extends TableAggregateFunction[JTuple2[JInteger.JInteger].Top2Accum] {
    
      override def createAccumulator() :Top2Accum = {
        val acc = new Top2Accum
        acc.first = Int.MinValue
        acc.second = Int.MinValue
        acc
      }
    
      def accumulate(acc: Top2Accum, v: Int) {
        if (v > acc.first) {
          acc.second = acc.first
          acc.first = v
        } else if (v > acc.second) {
          acc.second = v
        }
      }
    
      def merge(acc: Top2Accum, its: JIterable[Top2Accum) :Unit = {
        val iter = its.iterator()
        while (iter.hasNext) {
          val top2 = iter.next()
          accumulate(acc, top2.first)
          accumulate(acc, top2.second)
        }
      }
    
      def emitValue(acc: Top2Accum, out: Collector[JTuple2[JInteger.JInteger]]) :Unit = {
        // emit the value and rank
        if(acc.first ! =Int.MinValue) {
          out.collect(JTuple2.of(acc.first, 1))}if(acc.second ! =Int.MinValue) {
          out.collect(JTuple2.of(acc.second, 2))}}}val top2 = new Top2
    val orders: Table = tableEnv.from("Orders")
    val result = orders
        .groupBy('key)
        .flatAggregate(top2('a) as ('v.'rank))
        .select('key.'v.'rank)
    Copy the code

    **Note:** For flow queries, depending on the type of aggregation and the number of different grouping keys, the state required to calculate the query result may grow indefinitely. Provide query configurations with valid retention intervals to prevent excessive states. For more information, see Querying configurations.

  • Group Window FlatAggregate Batch Streaming

    Group and aggregate tables in the group window and possibly on one or more grouping keys. You must turn off “flatAggregate” with a SELECT statement. And the select statement does not support aggregate functions.

    val top2 = new Top2
    val orders: Table = tableEnv.from("Orders")
    val result = orders
        .window(Tumble over 5.minutes on 'rowtime as 'w) // define window
        .groupBy('a.'w) // group by key and window
        .flatAggregate(top2('b) as ('v.'rank))
        .select('a, w.start, 'w.end, 'w.rowtime, 'v.'rank) // access window properties and aggregate results
    Copy the code

Data Types

See the dedicated page on data types. Generic and (nested) compound types (such as POjos, tuples, rows, Scala case classes) can also be one-line fields. Fields with arbitrary nested compound types can be accessed using the value access feature. Generic types are treated as black boxes that can be passed or handled by user-defined functions.

Expression syntax

Some of the operators in the previous sections expect one or more expressions. You can specify expressions using an embedded Scala DSL or a string. Refer to the above example to see how to specify an expression.

This is the EBNF syntax for expressions:

expressionList = expression , { "," , expression } ;

expression = overConstant | alias ;

alias = logic | ( logic , "as" , fieldReference ) | ( logic , "as" , "(" , fieldReference , { "," , fieldReference } , ")");logic = comparison , [ ( "&" | "| |" ) , comparison ] ;

comparison = term , [ ( "=" | "= =" | "= = =" | ! "" =" | ! "" = =" | ">" | "> =" | "<" | "< =" ) , term ] ;

term = product , [ ( "+" | "-" ) , product ] ;

product = unary , [ ( "*" | "/" | "%") , unary ] ;

unary = [ "!" | "-" | "+" ] , composite ;

composite = over | suffixed | nullLiteral | prefixed | atom ;

suffixed = interval | suffixAs | suffixCast | suffixIf | suffixDistinct | suffixFunctionCall | timeIndicator ;

prefixed = prefixAs | prefixCast | prefixIf | prefixDistinct | prefixFunctionCall ;

interval = timeInterval | rowInterval ;

timeInterval = composite , "." , ("year" | "years" | "quarter" | "quarters" | "month" | "months" | "week" | "weeks" | "day" | "days" | "hour" | "hours" | "minute" | "minutes" | "second" | "seconds" | "milli" | "millis");rowInterval = composite , "." , "rows" ;

suffixCast = composite , ".cast(" , dataType , ")" ;

prefixCast = "cast(" , expression , dataType , ")" ;

dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" | "STRING" | "DECIMAL" | "SQL_DATE" | "SQL_TIME" | "SQL_TIMESTAMP" | "INTERVAL_MONTHS" | "INTERVAL_MILLIS" | ( "MAP" , "(" , dataType , "," , dataType , ")" ) | ( "PRIMITIVE_ARRAY" , "(" , dataType , ")" ) | ( "OBJECT_ARRAY" , "(" , dataType , ")");suffixAs = composite , ".as(" , fieldReference , ")" ;

prefixAs = "as(" , expression, fieldReference , ")" ;

suffixIf = composite , ".? (" , expression , "," , expression , ")" ;

prefixIf = "? (" , expression , "," , expression , "," , expression , ")" ;

suffixDistinct = composite , "distinct.()" ;

prefixDistinct = functionIdentifier , ".distinct"["(" , [ expression , { "," , expression } ] , ")"];suffixFunctionCall = composite , "." , functionIdentifier , [ "(" , [ expression , { "," , expression } ] , ")"];prefixFunctionCall = functionIdentifier , [ "(" , [ expression , { "," , expression } ] , ")"];atom = ( "(" , expression , ")" ) | literal | fieldReference ;

fieldReference = "*" | identifier ;

nullLiteral = "nullOf(" , dataType , ")" ;

timeIntervalUnit = "YEAR" | "YEAR_TO_MONTH" | "MONTH" | "QUARTER" | "WEEK" | "DAY" | "DAY_TO_HOUR" | "DAY_TO_MINUTE" | "DAY_TO_SECOND" | "HOUR" | "HOUR_TO_MINUTE" | "HOUR_TO_SECOND" | "MINUTE" | "MINUTE_TO_SECOND" | "SECOND" ;

timePointUnit = "YEAR" | "MONTH" | "DAY" | "HOUR" | "MINUTE" | "SECOND" | "QUARTER" | "WEEK" | "MILLISECOND" | "MICROSECOND" ;

over = composite , "over" , fieldReference ;

overConstant = "current_row" | "current_range" | "unbounded_row" | "unbounded_row" ;

timeIndicator = fieldReference , "." , ( "proctime" | "rowtime");Copy the code

Text: The text here is valid Java text. String literals can be specified in single or double quotation marks. Copy quotes for escape (for example, “It’s me.” Or “I” like “dogs.” ).

Empty text: Empty text must have a type attached. Use nullOf (type) (for example, nullOf (INT)) to create null values.

FieldReference: fieldReference specifies one column in the data (or all columns if * is used), and functionIdentifier specifies the supported scalar function. The column and function names follow the Java identifier syntax.

Function calls: Expressions specified as strings can also use prefix notation instead of postfix notation to call operators and functions.

Decimals: The Table API also supports Java’s BigDecimal type if you need to use exact numeric values or large decimals. In the Scala Table API, decimals can be defined by BigDecimal (” 123456 “), whereas in Java, they can be defined precisely by appressing the “p” for example, page 123456

Time representation: To use time values, the Table API supports Java SQL date, time, and timestamp types. In the Scala Table API, you can define a literal using java.sql.date.valueof (” 2016-06-27 “), java.sql.time.valueof (” 10:10:42 “), or java.sql. ValueOf (” 2016-06-27 10:10:42.123 “). The Java and Scala table apis also support calls to “2016-06-27”.todate (), “10:10:42”.totime (), and “2016-06-27 10:10:42.123”.totimestamp () to convert strings to the time type. Note: Since Java’s temporal SQL type depends on the time zone, make sure that Flink Client and all TaskManagers use the same time zone.

Intervals: Intervals can be expressed as months (types.interval_months) or milliseconds (types.interval_millis). You can add or subtract the same type of interval (e.g. 1. Hour + 10 minutes). You can add a millisecond interval to a time point (for example, 2016-08-10. ToDate + 5.days).

Scala expressions: : Scala expressions use implicit conversions. Therefore, please make sure that the wildcard import org. Apache. Flink. Table. API. Scala. _ added to the program. If the literal is not considered an expression, use.toexpr (e.g. 3.toexpr) to cast the literal.