In Flink, the State is called State, which is used to store intermediate results or some cached data. For many DataStream operators in Flink, they need to rely on certain intermediate results, namely State, for calculation. Such as de-redo operation, CEP check operation, Exactly Once and so on. So state is an important part of Flink processing system.

1. Types of status data structures

The state inheritance relationship is complex and has many levels, but the following data structure states are mainly used:

  • ValueState

    the single-valued state of type T, which can be obtained by the value() method and updated by the update method

  • ListState

    List of elements of type T

  • ReducingState

    Use the reducingState.add (value:T) method to immediately return a value aggregated using ReduceFunction, which can be obtained via reducingState.get ()

  • MapState

    holds a set of key-to-value mappings. This state provides many Java Map-like methods

  • AggregatingState

    and ReducingState have similar behaviors, and AggregateFunction is relatively more general
    ,out>

  • Internal****State

    This State is mainly used to access status data within the system, and is generally not used by users

  • BroadcastState

    Is used to store the state data in the BroadcastStream. The data in the BroadcastState is sent to all instances of the specified operator, and the data in each instance is the same

2. KeyedStateOperatorState

DataStream data sets can be divided into KeyedState and OperatorState based on whether they are grouped by Key.

  • KeyedState: KeyedStremStream each one aboveKeyEach corresponds to a State object.
  • OperatorState: OperatorStateAnd the parallel operator instance, there’s only one for the whole operatorState.
State KeyedState OperatorState
Scope of application KeyedStream operator For all types of operators
State the number of Each Key on the KeyedStrem stream corresponds to a State object The whole operator corresponds to an operator state
access Rewrite RichFunction Implement CheckpointedFunction or ListCheckpointed interface
State data structure support ValueState,ListState,ReducingState,AggregatingState,MapState ListState,BraoadcastState

3. StateBackend– A place to store and maintain state

StateBackend has the following functions

  • stateBackendThe ability to createstate, provides an interface to read data
  • When the system goescheckpointTime,stateBackendStore state persistently.

3.1 KeyStatedBackendThe implementation of the

The picture below shows that KeyStatedBackend implements KeyedStateBackend KeyedStateBackend inherited PriorityQueueSetFactory KeyStateFactory. So this class have PriorityQueueQueueElement priority queue, create InternalKvState method. This class also has the snapshot method to take snapshots of state data in KeyedStateBackend. Meanwhile, HeapKeyedStateBackend and RocksDBKeyedStateBackend are two basic implementation classes of KeyedStateBackend. RocksDBKeyedStateBackend is a separate JAR package that must be imported separately. Without this JAR package, you cannot see the inheritance relationship for RocksDBKeyedStateBackend

3.1.2 HeapKeyedStateBackendThe introduction of

HeapStateBackend is based on JVM heap memory storage and is the KeyedStateBackend supported by Flink by default.

  • HeapKeyedStateBackend State type

Since HeapKeyedStateBackend is a state backend that stores state types, HeapKeyedStateBackend’s state types mainly start with Heap, for example, HeapAggregatingState. HeapListState and HeapReducingState. This state type must inherit the AbstractHeapState class

  • Design and implementation of StateTable

  1. inHeapKeyedStateBackendMainly throughStateTableTo store data structures to store state data. inAbstractHeapStateDefined in theStateTableIn the instantiationAbstractHeapStateWill define thisStateTableThe implementation class of the interface.
  2. inHeapKeyedStateBackendIn, mainly throughMap<String, StateTable<K, ? ,? >> registeredKVStatesTo store it,keyRegister the name of the state of concentration for us,valueIs used to store stateStateTableThe data.
private final Map<String, StateTable<K, ? ,? >> registeredKVStates;Copy the code

  1. StateTable has CopyOnWriteStateTable and NestedMapsStateTable. By default, CopyOnWriteStateTable is generally used. CopyOnWriteStateTable data structure is used to support asynchronous snapshot operations during checkpoint. CopyonWriteMap data structure is used to store data. The underlying NestedMapsStateTable uses the NestedStateMap data structure to store data elements. The checkpoint procedure supports snapshot synchronization

  2. CopyOnWriteStateTable sacrifices performance and memory storage for asynchronous snapshots and incremental rehashing.

  3. CopyOnWriteStateTable differs from ordinary hashTable in that it uses two hashkeys, K Key and N namespace, to determine a value without using a nested structure. The main data structure for storage is StateMapEntry

    [] primaryTable, a list of statemapEntries
    ,>

3.2 Implementation of OperatorStateBackend

  • OperatorStateCan be achieved byOperatorStateBackendAs well asRawKeyedStateInputThere are two types of state management backend creation.OperatorStateBackendisFlinkThe default managed operator state of the management backend, managed state refers to byFlinkframe-managedState, such asValueState.ListState.MapState. Uselessness is managed by user awareness. The raw state, user-defined state, needs to be managed and used by the developers themselvesbyteArrays to read and write state content.

As shown in the following figure, OperatorStateBackend implements four interfaces. The OperatorStateStore interface provides methods to get BroadcastState, ListState, and StateNames registered in OperatorStateStore. Only DefaultOperatorStateBackend OperatorStatebackend a default implementation class. Also, the state data of all operators can only be stored in the MEMORY of the JVM.

Overall design of StateBackend

StateBackend is an interface that defines how to store and checkpoint status of streaming applications. Different StateBackend stores data in different ways. As shown in the following figure, the main implementation classes of StateBackend are MemoryStateBackend, FsStateBackend, and RocksDBackend. The StateBackend interface contains createKeyedStateBackend(), CeateOperarotStateBackend () method, in order to get the above KeyedStateBackend and OperatorStateBackend. The main difference between the three status backend is that the purchaser’s JetedStateBackend and checkpointStorage are different in north China and North China

name MemoryStateBackend FsStateBackend RocksDBackend
KeyedStatebackend HeapKeyedStateBackend HeapKeyedStateBackend RocksDBKeyedStateBackend
Checkpoint data JobManager heap memory Checkpoint data is stored in a specified file Embedded local database RocksDB
Usage scenarios Local development test debugging Available and production line environment, suitable for high availability solutions Compared with FSstateBackend, RocksDB’s LSM-Tree memory data structure stores a larger amount of state data. The back end of the only currently supported incremental checkpoint
Pay attention to the point Is limited by the size of the JobManger’s memory.

Each State defaults to 5MB and can be adjusted via constructors.

Each State cannot exceed the AkkaFrame size
State data is stored in TaskManager and cannot exceed TaskManager memory.

TaskManager asynchronously writes data stores to external storage
The total State size is limited by disk size, not memory.

RocksDB needs to set the external system file save State.

RocksDB’s JNI API is based on bytes, and a single key and value cannot exceed 2 to the power of 31

4. References

Flink design and implementation of the core principle and source code analysis