The article is mainly to Flink related contents in the website translation, the original address: ci.apache.org/projects/fl…

Join is a common and easily understood operation in batch data processing to Join the rows of two relationships. However, the join semantics on dynamic tables are less obvious and even confusing.

There are several ways in Flink to actually perform the connection using the Table API or SQL.

Please refer to the author’s previous articles for the time attribute and temporal table in this article.

Conventional JOIN

A regular join is the most generic join type, where any new records or changes to either side of the join input are visible and affect the overall join result. For example, if there is a new record on the left, it is merged with all previous and future records on the right.

SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id
Copy the code

These semantics allow any type of update (INSERT, Update, DELETE) to the input table.

However, this operation has an important implication: it needs to keep both ends of the join input in the Flink state at all times. Thus, if one or both input tables continue to grow, resource use will also grow indefinitely.

Time window JOIN

A time window join is defined by a join that checks whether the time attribute of the input record is within some time limit, that is, a time window.

SELECT *
FROM
  Orders o,
  Shipments s
WHERE o.id = s.orderId AND
      o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
Copy the code

In contrast to regular join operations, this type of join supports only append-only tables with a time attribute. Since the time attribute is a Quasi-monontic increment, Flink can remove old values from its state without affecting the correctness of the results.

The temporal table function JOIN

A join with a temporal table function will append only the table (left input/probe side) to the temporal table (right input/build side), that is, the table that changes over time and tracks its change. The following example shows an append-only table order that should be connected to the changing currency rate table RatesHistory. Orders is an append-only table that represents a payment for a given amount and a given currency. For example, at 10:15, the order amount is 2 euros.

SELECT * FROM Orders;
 
rowtime amount currency
======= ====== =========
10:15        2 Euro
10:30        1 US Dollar
10:32       50 Yen
10:52        3 Euro
11:04        5 US Dollar
Copy the code

RatesHistory represents a constantly changing table of only additional currency exchange rates, relative to the Japanese yen (with an exchange rate of 1). For example, the euro traded at 114 yen from 09:00 to 10:45. From 10:45 to 11:15, it traded at 116.

SELECT * FROM RatesHistory;
 
rowtime currency   rate
======= ======== ======
09:00   US Dollar   102
09:00   Euro        114
09:00   Yen           1
10:45   Euro        116
11:15   Euro        119
11:49   Pounds      108
Copy the code

Since we want to calculate the amount of all orders converted into common currency (Yen).

For example, we want to convert the following order (114) using the rowtime given the conversion rate

rowtime amount currency
======= ====== =========
10:15        2 Euro
Copy the code

If you do not use temporal tables, you need to write the following query:

SELECT
  SUM(o.amount * r.rate) AS amount
FROM Orders AS o,
  RatesHistory AS r
WHERE r.currency = o.currency
AND r.rowtime = (
  SELECT MAX(rowtime)
  FROM RatesHistory AS r2
  WHERE r2.currency = o.currency
  AND r2.rowtime < = o.rowtime);
Copy the code

With the temporal table function Rates, you can use the following SQL query

SELECT
  o.amount * r.rate AS amount
FROM
  Orders AS o,
  LATERAL TABLE (Rates(o.rowtime)) AS r
WHERE r.currency = o.currency
Copy the code

Each record from the probe side is associated with the version of the build side table. To support updates (overrides) of previous values on the table on the generation side, the table must have a primary key defined.

In our example, each record in the Orders table will join the Rates table at time. The Currency field has been defined as the primary key of the previous Rates and is used to join the two tables in our example. If the query uses the concept of processing time, newly added orders are always connected to the latest version of Rates when the operation is performed.

Compared to a regular join, this means that if there is a new record on the build side (temporal table), the result of the previous join will not be affected. This again allows Flink to limit the number of elements, keeping state.

Compared with time window join, temporal table join does not define time window (data in time window will be joined). The probe record is always connected to the build version specified by the time attribute. Therefore, the record of the build aspect can be arbitrarily old. Over time, previous and no longer needed versions of records (for a given primary key) are removed from the state.

usage

Once the temporal table function is defined, we can use it. Temporal table functions are used in the same way as normal table functions.

The following code snipple solves our problem of converting currencies from the Orders table:

SQL:SELECT
  SUM(o_amount * r_rate) AS amount
FROM
  Orders,
  LATERAL TABLE (Rates(o_proctime))
WHERE
  r_currency = o_currency
  
JAVA:
Table result = orders
    .join(new Table(tEnv, "rates(o_proctime)"), "o_currency = r_currency")
    .select("(o_amount * r_rate).sum as amount");
 
SCALA:
val result = orders
    .join(rates('o_proctime), 'r_currency === 'o_currency) .select(('o_amount * 'r_rate).sum as 'amount)

Copy the code

Note: For temporal table joins, the state retention defined in the query configuration has not yet been implemented. This means that, depending on the number of different primary keys in the history table, the number of states required to calculate the query results can grow indefinitely.

Handle time tense connections

With the processing time attribute, it is not possible to pass a past time attribute as an argument to the temporal table function. By definition, it is always the current timestamp. Therefore, a call to a function that processes a temporal table will always return the latest known version of the underlying table, and any updates in the underlying history table will immediately override the current value.

Only the latest version recorded on the build side (relative to the defined primary key) is kept in this state. Updates on the builder side do not affect previously emitted join results.

You can think of the temporal join during processing as a simple HashMap <K, V> that stores all the records from the build side. When a new record from the build side has the same key as the previous record, the old value is only overwritten. Always evaluate each record from the probe side against the latest/current state of the HashMap.

Event time tense connection

With the event time attribute (that is, the RowTime attribute), you can pass the past time attribute to the schedule function. This allows two tables to be joined together at a common point in time.

In contrast to working with temporal join, temporal tables not only hold the latest version of the build side record in the state (relative to the defined primary key), but also store all versions since the last watermark (identified by time).

For example, based on the concept of temporal tables, incoming rows with an event time timestamp of 12:30:00 attached to the detector side table are concatenated to the version of the build side table at time 12:30:00. . Therefore, incoming rows are only connected to rows with a timestamp less than or equal to 12:30:00, and updates are applied based on the primary key until then.

Defined by the event time, watermark allows the connection operation to move forward in time and discard the version of the build table that is no longer needed because incoming rows with a lower or equal timestamp are not expected.

JOIN the temporal table

Join to temporal table Joins any table (left input/probe side) to a temporal table (right input/build side), that is, an external dimension table that changes over time.

Note: Users cannot use any table as a temporal table; instead, they need to use tables supported by LookupableTableSource. LookupableTableSource can only be used as a temporal table for time joins. For more details on how to define a LookupableTableSource, see How to Define a LookupableTableSource.

The following example shows the Orders flow which should be combined with the changing currency exchange rate table LatestRates.

The LatestRates are dimension tables realized using the latest exchange rates. At 10:15, 10:30 and 10:52, the contents of LatestRates are as follows:

10:15> SELECT * FROM LatestRates;

currency   rate
======== ======
US Dollar   102
Euro        114
Yen           1

10:30> SELECT * FROM LatestRates;

currency   rate
======== ======
US Dollar   102
Euro        114
Yen           1


10:52> SELECT * FROM LatestRates;

currency   rate
======== ======
US Dollar   102
Euro        116     < ==== changed from 114 to 116
Yen           1
Copy the code

The contents of LastestRates at time 10:15 and 10:30 are equal. The euro changed from 114 to 116 at 10:52. An order is an app-only table that represents a payment for a given amount and a given currency. For example, there was an order for 2 euros at 10:15.

SELECT * FROM Orders;

amount currency
====== =========
     2 Euro             < == arrived at time 10:15
     1 US Dollar        < == arrived at time 10:30
     2 Euro             < == arrived at time 10:52
Copy the code

Suppose we want to calculate the amount of all orders converted into the common currency (yen). For example, we would like to convert the following order using the latest exchange rate in LatestRates. The result will be:

amount currency     rate   amout*rate
====== ========= ======= ============
     2 Euro          114          228    <== arrived at time 10:15
     1 US Dollar     102          102    <== arrived at time 10:30
     2 Euro          116          232    <== arrived at time 10:52
Copy the code

With the help of temporal table joins, we can represent a query in SQL as:

SELECT
  o.amout, o.currency, r.rate, o.amount * r.rate
FROM
  Orders AS o
  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency
Copy the code

Each record on the probe side will be associated with the current version of the build side table. In our example, the query uses the processing time concept, so the new additional order is always combined with the latest version LatestRates when the operation is performed. Note that the results are not deterministic for the processing time.

In contrast to the regular join, the previous results of the temporal table join will not be affected despite the build changes. Furthermore, the temporal table join operator is very lightweight and does not retain any state.

In contrast to a time window join, a temporal table join does not define the time window in which records will be joined. During processing, the records on the probe side are always combined with the latest version on the build side. Therefore, the record of the build aspect can be arbitrarily old.

The temporal table function join and temporal table join both come from the same motivation, but have different SQL syntax and runtime implementations:

  • The SQL syntax for the temporal table function join is join UDTF, and the temporal table join uses the standard temporal notation introduced in SQL: 2011.
  • The implementation of the temporal table function Join actually joins the two streams and keeps them in state, whereas the temporal table join only accepts a unique input stream and looks up the external database based on the keys in the record.
  • The temporal table function HOin is commonly used to join the change log flow, while the temporal table join is commonly used to join external tables (i.e., dimension tables).

This behavior makes temporal tables a good candidate for flow enrichment in relational terms.

In the future, temporal table joins will support the functionality of temporal table functional joins, which supports temporal join change log flow.

usage

The syntax for a temporal table join is as follows:

SELECT [column_list]
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.proctime [AS <alias2>]
ON table1.column-name1 = table2.column-name1
Copy the code

Currently, only INNER JOIN and LEFT JOIN are supported. Temporary tables should be followed by FOR SYSTEM_TIME AS OF Table1.proctime. Proctime is the processing time attribute of Table1. This means that when each record in the left table is joined, it takes a snapshot of the temporal table at processing time.

For example, after defining a temporal table, we can use it as follows.

SELECT
  SUM(o_amount * r_rate) AS amount
FROM
  Orders
  JOIN LatestRates FOR SYSTEM_TIME AS OF o_proctime
  ON r_currency = o_currency
Copy the code

Note:

  1. Supported only in the Blink Planner program.
  2. It is only supported in SQL and not yet in the Table API.
  3. Flink does not currently support event time temporal table joins.