Moment For Technology

Flink Table/SQL API planning -- Dynamic Table

Posted on Aug. 8, 2023, 1:29 a.m. by Lagan Datta
Category: The back-end Tag: java spark hadoop sql Cloud computing flink
The concept of dynamic table has been proposed by the community for a long time, but not all of them have been realized. All the introductions in the following are based on existing plans and proposals, which may differ from the later implementation for reference onlyCopy the code


Dynamic table is intuitively similar to the Materialized View concept in database. Dynamic tables change over time; Similar to static Batch table can be queried using standard SQL and then a new dynamic table; Can interconvert losslessly with streams (dual). The biggest improvement to the existing API is that the contents of the table change over time, and the current state is just appEnd. The current Streaming Table can be considered as a dynamic table in append mode.

Flow to the Dynamic Table

When a stream is converted to a Table, the choice of schema is based on whether the primary key is defined in the Table's schema.

Append mode:

If the table schema does not include a key definition, use the Append schema when converting to a table. Append each new record in the stream to the table as a new row. Once data has been added to a table, it cannot be updated or deleted.

Replace mode:

Similarly, if a key is defined, insert each record in the flow if the key is not in the table or update it otherwise.

The Dynamic Table to the stream

The table-to-stream operation sends all changes of the table downstream as a Changelog Stream. There are also two modes for this step.

Retraction mode:

Traction mode generates insert or DELETE event for Dynamic Table insert and delete change respectively. If it is an update change, two kinds of change events will be generated. For the record sent previously with the same key, a DELETE event will be generated. For the current record, an INSERT event will be generated. As shown below:

The Update mode:

Update mode defines keys based on Dynamic tables. All change events are a KV pair. Key corresponds to the key of the table in the current record value; The new record corresponds to insert and change values. A null value for delete indicates that the delete can have been deleted. As shown below:


The contents of the table change over time meaning that the query results on the table also change over time. We define:

  • A[t]: table A at time t
  • Q (A[t]) : Execute query Q on table A at time t

Here's an example to understand the concept of dynamic tables:

Query restrictions

Since the flow is infinite, the corresponding Dynamic Table is also unbounded. When querying an infinite table, we need to ensure that the timing of the query is good, meaningful and feasible.

1. In practice, Flink converts queries into continuous streaming applications. The query executed is only for the current logical time, so it does not support queries at any point in time (A[t]). Query possible states and computations must be bounded so that incrementally computable queries can be supported:

  • Queries that constantly update the current results: Queries can produce INSERT, UPDATE, and DELETE changes. A query can be represented asQ(t+1) = q'(Q(t), c(T, t, t+1))Where Q(t) is the previous query result of Query Q, c(t, t, t_+1) is the change of table T from t+1 to t, and Q 'is the incremental version of Q.
  • Produces append-only tables where new data can be computed directly from the end of the input table. A query can be represented as(c(t, t-x, t+1)) ∪ Q(t) = Q "(c(t, t-x, t+1)) ∪ Q(t), q "" is the result incremental version of query Q without time t. C (T, t-x, T +1) is x+1 at the end of table T, x depends on semantics. For example, last hour window aggregation requires at least last hour of data as status. Other types of queries that can be supported include SELECT WHERE that operates independently on each row; GROUP BY clauses on RowTime (such as time-based Window aggregate); ORDER BY rowtime OVER Windows (row-Windows); The ORDER BY rowtime.

    3. When the input table is small enough, access each entry in the table. For example, join two flow tables with a fixed size (such as a fixed number of keys).

The intermediate state is bounded

As mentioned above, some incremental queries need to retain some data (partial input data or intermediate results) as state. To ensure that a query does not fail, it is important to ensure that the space required by the query is bounded and does not grow indefinitely over time. There are two main reasons for state growth:

  1. Growth of intermediate computed state not subject to time predicates (such as bloat of aggregate keys)
  2. Time-bounded but requiring late data (such as window aggregation)

While the second case can be solved with the "Last Result Offset" parameter mentioned below, the first case requires optimizer detection. We should reject queries with intermediate state growth that are not time-bound. The optimizer should provide how to fix the query and require appropriate time predicates. For example, the following query:

SELECT user, page, COUNT(page) AS pCnt
FROM pageviews
GROUP BY user, pageCopy the code

The intermediate state grows over time as the number of users and pages grows. Requirements for storage space can be limited by adding time predicates:

SELECT user, page, COUNT(page) AS pCnt
FROM pageviews
WHERE rowtime BETWEEN now() - INTERVAL '1' HOUR AND now(a) / /only last hour
GROUP BY user, pageCopy the code

Because not all attributes are constantly growing, you can tell the optimizer the size of the domain, infer that the intermediate state does not grow over time, and then accept queries with no time predicates.

val sensorT: Table = sensors
  .attributeDomain('loc.Domain.constant) // domain of 'loc is not growing 
env.registerTable("sensors", sensorT)

SELECT loc, AVG(temp) AS avgTemp
FROM sensors
GROUP BY locCopy the code

Results of calculation and refinement of timing

Some relational operators must wait for data to arrive before calculating the final result. For example, a window that closes at 10:30 will have to wait until at least 10:30 to compute the final result. Flink's logical clock (that is, deciding when 10:30 is) depends on whether event time or Processing time is used. In the case of Processing time, logical time is the wallclock for each machine; In the case of Event Time, the logical clock time is determined by the watermark provided by the source. Due to the out-of-order and delay of data, wait for a period of time in event Time mode to reduce the incompleteness of calculation results. On the other hand, in some cases, early results are expected to be continuously improved. So there are different requirements for the results to be calculated, improved or final,

The following diagram depicts how different configuration parameters are used to control the early results and refine the calculated results.

  • "First Result Offset" refers to the time at which the First early Result was calculated. The time is relative to the time when the full result can be calculated for the first time (such as the end time relative to the window at 10:30). If the value is set to -10 minutes, the first result to be sent will be computed at 10:20 logical time for a window whose end time is 10:30. The default value of this parameter is 0, which means the result is not evaluated until the window ends.
  • "Complete Result Offset" refers to the time when the Complete Result is calculated. The time is relative to the first time the complete time can be calculated. If set to +5 minutes, the full result will be produced at 10:35 for a window with a 10:30 end time. This parameter can mitigate the impact of delayed data. The default is 0, which means that the complete result is computed at the end of the window.
  • "Update Rate" refers to the time interval (either time or number of times) between each Update before the full result is computed. If set to 5 minutes, the window size is 30 minutes of tumbling window, start time is 10: 300, "First Result Offset" is -15 minutes, "Complete Result Offset" is 2 minutes, then the Result will be updated at 10:20, 10:25, 10:30, 10:15, Zen city will send a Result at 10:15, 10:32 to produce the Complete Result.
  • "Last Updates Switch" means whether delayed Updates are calculated for delayed data after full results are issued until the calculated state is cleared.
  • "Last Result Offset" refers to the time at which the Last Result can be calculated. This is the time when the internal state is cleared, and any data that arrives after the state is cleared is discarded. The Last Result Offset means that the calculated Result is approximate and cannot be guaranteed to be accurate.
About (Moment For Technology) is a global community with thousands techies from across the global hang out!Passionate technologists, be it gadget freaks, tech enthusiasts, coders, technopreneurs, or CIOs, you would find them all here.