The background,

Flink 1.13 was recently released, and over 200 contributors participated in the development of Flink 1.13, committing over 1,000 Commits and completing a number of significant functions. Among them, PyFlink module has also added some important functions in this version, such as support for state, custom window, row-based operation and so on. With the introduction of these features, PyFlink functionality has become more and more complete, and users can use Python to complete the development of most types of Flink jobs. Next, we’ll detail how to use the State & Timer functionality in the Python DataStream API.

II. Introduction of STATE function

As a stream computing engine, state is one of the most core features in Flink.

  • In 1.12, the Python DataStream API does not yet support state, and users can only use the Python DataStream API to implement simple applications that do not require state.
  • In 1.13, the Python DataStream API supports this important feature.

Example use of state

Here is a simple example of how to use State in a Python DataStream API job:

from pyflink.common import WatermarkStrategy, Row from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.connectors import NumberSequenceSource from pyflink.datastream.functions import RuntimeContext, MapFunction from pyflink.datastream.state import ValueStateDescriptor class MyMapFunction(MapFunction): def open(self, runtime_context: RuntimeContext): state_desc = ValueStateDescriptor('cnt', Value state self.cnt_state = runtime_context.get_state(state_desc) def map(self, value): cnt = self.cnt_state.value() if cnt is None: cnt = 0 new_cnt = cnt + 1 self.cnt_state.update(new_cnt) return value[0], new_cnt def state_access_demo(): # 1. Create StreamExecutionEnvironment env = StreamExecutionEnvironment. Get_execution_environment () # 2. Seq_num_source = numberSequencesource (1, 100) ds = env.from_source(source=seq_num_source, watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(), source_name='seq_num_source', type_info=Types.LONG()) # 3. Map (lambda a: Row(a % 4, 1), output_type= types.row ([types.long (), types.long ()])) \.key_by(lambda a: Row(a % 4, 1), output_type= types.row ([types.long ()])) \.key_by(lambda a: Row(a % 4, 1)) a[0]) \ .map(MyMapFunction(), output_type=Types.TUPLE([Types.LONG(), Types.LONG()])) # 4. Print () # execute() if __name__ == '__main__': state_access_demo()

In the above example, we define a mapFunction that defines a ValueState named “CNT_STATE” to keep track of the number of occurrences of each key.


  • In addition to ValueState, the Python Datastream API supports listState, mapState, reducingState, and aggregatingState;
  • When you define StateDescriptor for a State, you need to declare the type of data that is stored in the State (TypeInformation). Also note that the TypeInformation field is not currently in use and pickles are used by default for serialization, so it is recommended to define the TypeInformation field as types.pickled_byte_array (), Matches the serializer that is actually used. In this way, backward compatibility can be maintained after subsequent versions support the use of TypeInformation;
  • State can be used in other operations besides the map operation of a KeyedStream; In addition, you can also use state in connection flows, such as:
ds1 = ... # type DataStream ds2 = ... # type DataStream ds1.connect(ds2) \ .key_by(key_selector1=lambda a: a[0], key_selector2=lambda a: A [0]) \.map(myComapFunction ()) # You can use state in myComapFunction

The list of APIs that can be used with State is as follows:

operation Custom function
KeyedStream map MapFunction
flat_map FlatMapFunction
reduce ReduceFunction
filter FilterFunction
process KeyedProcessFunction
ConnectedStreams map CoMapFunction
flat_map CoFlatMapFunction
process KeyedCoProcessFunction
WindowedStream apply WindowFunction
process ProcessWindowFunction

Working principle of STATE

Above is an architectural diagram of how state works in PyFlink. As can be seen from the figure, the Python custom function runs in the Python worker process, while the state backend runs in the JVM process (managed by Java operators). When a Python custom function needs to access the state, it accesses the state backend via a remote call.

As we know, the cost of remote invocation is very high. In order to improve the performance of state read and write, PyFlink has optimized the state read and write in the following aspects:

  • Lazy to Read:

    For a state with multiple entries, such as a MapState, the state data is not read into the Python worker all at once when the state is traversed; it is read from the state backend only when it is really needed.

  • Async Write:

    When updating the state, the updated state is stored in the LRU cache and is not updated to the remote state backend synchronously. This avoids having to access the remote state backend for each state update operation. At the same time, multiple updates of the same key can be combined to avoid invalid state updates.

  • LRU cache:

    The cache for state reads and writes is maintained in the Python worker process. When a key is read, it is checked to see if it has already been loaded into the read cache. When a key is updated, it is first stored in the write cache. For a key that is read and written frequently, the LRU Cache can avoid accessing the remote state backend for each read and write operation. For scenarios with hot keys, the performance of state read and write can be greatly improved.

  • Flush on Checkpoint:

    In order to ensure the correctness of the checkpoint semantics, when the Java operator needs to perform a checkpoint, the Python worker’s write cache is flushed back to the state backend.

The LRU cache can be subdivided into two levels, as shown in the figure below:


  • The second level cache is the global cache, and the read cache in the second level cache stores all the original state data cached in the current Python worker process (not deserialized). All state objects created in the current Python worker process are stored in the write cache of the secondary cache.
  • The first-level cache resides within each state object, which caches the state data that the state object has read from the remote state backend and the state data that is to be updated back to the remote state backend.


  • When a state object is created in a Python UDF, it is first checked to see if the state object corresponding to the current key already exists (look for it in the “Global Write Cache” in the secondary cache). If it does exist, the state object is found in the “Global Write Cache”. The corresponding state object is returned; If it does not exist, create a new state object and store it in the “Global Write Cache”.
  • Read the state: When a Python UDF reads a state object, if the state data to be read already exists (in a first-level cache), for example, in a map state, the map key/map value to be read already exists. Return the corresponding map key/map value; Otherwise, access the secondary cache and read from the remote state backend if no state data exists in the secondary cache.
  • State writes: In Python UDF, when a state object is updated, it is first written to the state object’s internal write cache (level 1). When the size of the state data to be written back to the state backend in the state object exceeds the specified threshold or when a checkpoint is encountered, the state data to be written back is written back to the remote state backend.

STATE performance tuning

From the previous section, we learned that PyFlink uses a variety of optimizations to improve the performance of state reads and writes. These optimizations can be configured with the following parameters:

configuration instructions
python.state.cache-size The size of the read and write caches in the Python worker. It is not currently supported to configure the size of the read and write caches separately. The maximum number of entries that are read from the state backend and returned to the Python worker each time the map State is traversed. The maximum number of entries allowed in a map State read cache (Level 1 cache). When the number of entries in the read cache in a map State exceeds this threshold, the least-recently accessed entries are removed from the read cache using an LRU policy. The maximum number of entries allowed to be updated in a map State’s write cache (Level 1 cache). When the number of entries to be updated in a MapState exceeds this threshold, the state data to be updated in the MapState is written back to the remote state backend.

Note that the performance of state reads and writes depends not only on the above parameters, but also on other factors, such as:

  • Distribution of keys in input data:

    The more scattered the input keys are, the lower the probability of a read cache hit is, and the worse the performance is.

  • The number of reads and writes of state in Python UDF:

    State reads and writes may involve reading and writing the remote state backend, and you should try to optimize your Python UDF implementation to reduce unnecessary state reads and writes.

  • Checkpoint interval:

    In order to ensure the correctness of the checkpoint semantics, the Python worker writes all cached state data to the state backend when a checkpoint is encountered. If the checkpoint interval is configured too small, it may not effectively reduce the amount of data the Python worker writes back to the state backend.

  • Bundle size/bundle time:

    The current Python operator divides the input data into batches and sends them to the Python worker for execution. When a batch of data is processed, the state to be updated in the Python worker process is forced to be written back to the state backend. Like the checkpoint interval, this behavior may also affect state write performance. The size of the batch can be controlled using the python.fn-execution. Bundle. size and python.fn-execution.

III. Introduction to Timer Function

Examples of Timer usage

In addition to state, users can also use a timer in the Python DataStream API.

import datetime from pyflink.common import Row, WatermarkStrategy from pyflink.common.typeinfo import Types from pyflink.common.watermark_strategy import TimestampAssigner from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext from pyflink.datastream.state import ValueStateDescriptor from pyflink.table import StreamTableEnvironment class CountWithTimeoutFunction(KeyedProcessFunction): def __init__(self): self.state = None def open(self, runtime_context: RuntimeContext): self.state = runtime_context.get_state(ValueStateDescriptor( "my_state", Types.ROW([Types.STRING(), Types.LONG(), Types.LONG()]))) def process_element(self, value, ctx: 'KeyedProcessFunction.Context'): # retrieve the current count current = self.state.value() if current is None: current = Row(value.f1, 0, 0) # update the state's count current[1] += 1 # set the state's timestamp to the record's assigned event time timestamp current[2] = ctx.timestamp() # write the state back self.state.update(current) # schedule the next timer 60 seconds from  the current event time ctx.timer_service().register_event_time_timer(current[2] + 60000) def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'): # get the state for the key that scheduled the timer result = self.state.value() # check if this is an outdated timer or  the latest timer if timestamp == result[2] + 60000: # emit the state on timeout yield result[0], result[1] class MyTimestampAssigner(TimestampAssigner): def __init__(self): self.epoch = datetime.datetime.utcfromtimestamp(0) def extract_timestamp(self, value, record_timestamp) -> int: return int((value[0] - self.epoch).total_seconds() * 1000) if __name__ == '__main__': env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(stream_execution_environment=env) t_env.execute_sql(""" CREATE TABLE my_source ( a TIMESTAMP(3), b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '10' ) """) stream = t_env.to_append_stream( t_env.from_path('my_source'), Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING()])) watermarked_stream = stream.assign_timestamps_and_watermarks( WatermarkStrategy.for_monotonous_timestamps() .with_timestamp_assigner(MyTimestampAssigner())) # apply the process function onto a keyed stream watermarked_stream.key_by(lambda value: value[1])\ .process(CountWithTimeoutFunction()) \ .print() env.execute()

In the above example, we defined a KeyedProcessFunction that records the number of occurrences of each key. When a key has not been updated for more than 60 seconds, the key and the number of occurrences will be sent to the downstream node.

In addition to the Event Time Timer, users can also use Processing Time Timer.

How Timer Works

The Timer workflow is as follows:

  • Different from the State access using a separate communication channel, when the user registers the Timer, the registration message is sent to the Java operator through the data channel.
  • After receiving the Timer registration message, the Java operator first checks the trigger time of the Timer to be registered. If it has exceeded the current time, it will directly trigger. Otherwise, the timer is registered with the Timer Service of the Java operator;
  • When a timer is triggered, the trigger message is sent to the Python worker over the data channel, and the Python worker calls back to the user’s on_timer method in the Python UDF.

It is important to note that because Timer registration messages and trigger messages are sent asynchronously between Java operators and Python workers over the data channel, Timer firing may not be as timely in some situations. For example, when a user registers a Processing Time Timer, it may be several seconds before the trigger message is sent over the data channel to the Python UDF after the timer is triggered.


In this article, we focused on how to use State & Timer in Python Datastream API jobs, how State & Timer works, and how to perform performance tuning. We will continue our PyFlink series of articles to help PyFlink users learn more about PyFlink features, application scenarios, and best practices.

In addition, the real-time computing ecology team of Ali Cloud is long-term recruiting excellent big data talents (including internship + social recruitment). Our work includes:

  • Real-time machine learning: support the collaboration between real-time feature engineering and AI engine in machine learning scenarios, build real-time machine learning standards based on Apache Flink and its ecology, and promote comprehensive real-time scenarios such as search, recommendation, advertising, risk control, etc.
  • Integration of big data and AI: including programming language integration (PyFlink related work), execution engine integration (TF on Flink), workflow and management integration (Flink AI Flow).