The article is mainly to Flink related contents in the website translation, the original address: ci.apache.org/projects/fl…

The semantics of the Table API and SQL queries are the same whether the input is a bounded batch input or an unbounded stream input. In many cases, a continuous query of a convection input can produce the same exact results as an offline calculation. However, this is not possible in general because successive queries must limit the size of the state they maintain to avoid running out of storage space and to be able to process unlimited streams of data for a long time. As a result, a continuous query may provide only approximate results, depending on the characteristics of the input data and the query itself.

Flink’s Table API and SQL interface provide parameters to adjust the accuracy and resource consumption of continuous queries. The parameters are specified through the QueryConfig object. QueryConfig can be obtained from the TableEnvironment and returned when the Table is converted (that is, converted to DataStream or sent through TableSink).

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)

// obtain query configuration from TableEnvironment
val qConfig: StreamQueryConfig = tableEnv.queryConfig
// set query parameters
qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))

// define query
val result: Table=???// create TableSink
val sink: TableSink[Row] =???// register TableSink
tableEnv.registerTableSink(
  "outputTable".// table name
  Array[String] (...). .// field names
  Array[TypeInformation[_]] (...). .// field types
  sink)                           // table sink

// emit result Table via a TableSink
result.insertInto("outputTable", qConfig)

// convert result Table into a DataStream[Row]
val stream: DataStream[Row] = result.toAppendStream[Row](qConfig)
Copy the code

In the following sections, we describe the parameters of QueryConfig and how they affect query accuracy and resource consumption.

Idle state retention time

Many queries aggregate or join records on one or more key properties. When this type of query is executed on a flow, a continuous query requires collecting records or maintaining partial results for each key. If the key field of the input stream is changing, that is, the active key value is changing over time, then successive queries accumulate more and more states with more and more different keys. Often, however, keys become inactive after a period of time, and their corresponding state becomes stale and useless.

For example, the following query counts the number of clicks per session.

SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
Copy the code

The sessionId attribute is used as a group key, and successive queries maintain the count of each sessionId they see. The sessionId attribute changes (evolves) over time, and the sessionId value is only valid until the end of a session, that is, for a limited period of time. However, continuous queries cannot know this attribute of a sessionId and expect each sessionId value to occur at any point in time. It maintains a count of each (observed) sessionId value. Therefore, as more and more sessionId values are observed, the total state size of the query grows.

The Idle State Retention Time parameter defines how long the key State is retained without updating it before it is deleted. For the previous example query, the count of the sessionId was deleted as long as it was not updated during the configured time period.

By deleting the state of the key, the successive query completely forgets that it has seen the key before. If a record with a key whose state has been deleted is processed, it is treated as the first record with the corresponding key. For the example above, this means that the count of sessionids will start at 0 again.

There are two parameters to configure the idle state retention time:

  • Minimum Idle State Retention Time defines how long the state of an inactive key will remain before it is deleted.
  • Maximum Idle State Retention Time Specifies the maximum period for which the state of an inactive key is retained before it is deleted.
val qConfig: StreamQueryConfig=???// set idle state retention time: min = 12 hours, max = 24 hours
qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))
Copy the code

Clearing the state requires additional bookkeeping, which becomes more practical (and cheaper) for the larger difference between minTime and maxTime. The difference between minTime and maxTime must be at least 5 minutes.