sequence

This paper mainly studies Flink’s CheckpointedFunction

The instance

public class BufferingSink
        implements SinkFunction<Tuple2<String, Integer>>,
                   CheckpointedFunction {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear();
        for (Tuple2<String, Integer> element : bufferedElements) {
            checkpointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
            new ListStateDescriptor<>(
                "buffered-elements",
                TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));

        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        if (context.isRestored()) {
            for(Tuple2<String, Integer> element : checkpointedState.get()) { bufferedElements.add(element); }}}}Copy the code
  • This BufferingSink implements the CheckpointedFunction interface, which defines checkpointedState of type ListState and bufferedElements of the List structure
  • The invoke method first caches value to bufferedElements. When the number of cached elements triggers a threshold, the sink operation is performed and bufferedElements is emptied
  • Create a snapshot of bufferedElements in the snapshotState method, create a ListStateDescriptor in initializeState, Then through FunctionInitializationContext. GetOperatorStateStore (.) getListState (descriptor) to obtain ListState, If state is restored from the snapshot of the previous execution, restore the ListState data to bufferedElements

CheckpointedFunction

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java

@PublicEvolving
@SuppressWarnings("deprecation")
public interface CheckpointedFunction {

	/**
	 * This method is called when a snapshot for a checkpoint is requested. This acts as a hook to the function to
	 * ensure that all state is exposed by means previously offered through {@link FunctionInitializationContext} when
	 * the Function was initialized, or offered now by {@link FunctionSnapshotContext} itself.
	 *
	 * @param context the context for drawing a snapshot of the operator
	 * @throws Exception
	 */
	void snapshotState(FunctionSnapshotContext context) throws Exception;

	/**
	 * This method is called when the parallel function instance is created during distributed
	 * execution. Functions typically set up their state storing data structures in this method.
	 *
	 * @param context the context for initializing the operator
	 * @throws Exception
	 */
	void initializeState(FunctionInitializationContext context) throws Exception;

}
Copy the code
  • CheckpointedFunction was a core interface stateful Transformation Functions that was used to maintain state across streams
  • The snapshotState is called at checkpoint and is used for snapshot state. It is usually used for flush, commit, and Synchronize external systems
  • InitializeState is called at the time of parallel function initialization (first initialization or previous checkpoint recover) and is usually used to initializeState and the logic to process state recovery

FunctionSnapshotContext

Flink – runtime_2. 11-1.7.0 – sources. The jar! /org/apache/flink/runtime/state/FunctionSnapshotContext.java

/**
 * This interface provides a context in which user functions that use managed state (i.e. state that is managed by state
 * backends) can participate in a snapshot. As snapshots of the backends themselves are taken by the system, this
 * interface mainly provides meta information about the checkpoint.
 */
@PublicEvolving
public interface FunctionSnapshotContext extends ManagedSnapshotContext {
}
Copy the code
  • FunctionSnapshotContext inherits the ManagedSnapshotContext interface

ManagedSnapshotContext

Flink – runtime_2. 11-1.7.0 – sources. The jar! /org/apache/flink/runtime/state/ManagedSnapshotContext.java

/**
 * This interface provides a context in which operators that use managed state (i.e. state that is managed by state
 * backends) can perform a snapshot. As snapshots of the backends themselves are taken by the system, this interface
 * mainly provides meta information about the checkpoint.
 */
@PublicEvolving
public interface ManagedSnapshotContext {

	/**
	 * Returns the ID of the checkpoint for whichthe snapshot is taken. * * <p>The checkpoint ID is guaranteed to be strictly monotonously increasing across checkpoints.  * For two completed checkpoints <i>A</i> and <i>B</i>, {@code ID_B > ID_A} means that checkpoint * <i>B</i> subsumes checkpoint <i>A</i>, i.e., checkpoint <i>B</i> contains a later state * than checkpoint <i>A</i>. */ long getCheckpointId(); /** * Returns timestamp (wall clock time) when the master node triggered the checkpointfor which
	 * the state snapshot is taken.
	 */
	long getCheckpointTimestamp();
}
Copy the code
  • ManagedSnapshotContext defines the getCheckpointId and getCheckpointTimestamp methods

FunctionInitializationContext

Flink – runtime_2. 11-1.7.0 – sources. The jar! /org/apache/flink/runtime/state/FunctionInitializationContext.java

/**
 * This interface provides a context in which user functions can initialize by registering to managed state (i.e. state
 * that is managed by state backends).
 *
 * <p>
 * Operator state is available to all functions.while keyed state is only available for functions after keyBy.
 *
 * <p>
 * For the purpose of initialization, the context signals if the state is empty or was restored from a previous
 * execution.
 *
 */
@PublicEvolving
public interface FunctionInitializationContext extends ManagedInitializationContext {
}
Copy the code
  • FunctionInitializationContext inherited ManagedInitializationContext interface

ManagedInitializationContext

Flink – runtime_2. 11-1.7.0 – sources. The jar! /org/apache/flink/runtime/state/ManagedInitializationContext.java

/**
 * This interface provides a context in which operators can initialize by registering to managed state (i.e. state that
 * is managed by state backends).
 *
 * <p>
 * Operator state is available to all operators, while keyed state is only available for operators after keyBy.
 *
 * <p>
 * For the purpose of initialization, the context signals if the state is empty (new operator) or was restored from
 * a previous execution of this operator.
 *
 */
public interface ManagedInitializationContext {

	/**
	 * Returns true.if state was restored from the snapshot of a previous execution. This returns always false for
	 * stateless tasks.
	 */
	boolean isRestored();

	/**
	 * Returns an interface that allows for registering operator state with the backend.
	 */
	OperatorStateStore getOperatorStateStore();

	/**
	 * Returns an interface that allows for registering keyed state with the backend.
	 */
	KeyedStateStore getKeyedStateStore();

}
Copy the code
  • ManagedInitializationContext interface defines isRestored, getOperatorStateStore, getKeyedStateStore method

summary

  • Flink has two basic states, namely Keyed state and Operator state (non-keyed state); Keyed State can only be used at functions and operators on KeyedStream. Each operator state is bound to an instance of the parallel Operator; The Operator State supports Redistributing during the parallelism change
  • Both Keyed State and Operator State have managed and raw forms respectively. Managed is managed by Flink Runtime, which is responsible for encode and checkpoint writing. The raw state is managed by operators themselves. Flink Runtime does not know the data structure of the state and treats it as raw bytes. Managed State is available to all datastream functions, while raw state is generally limited to implementing operators yourself
  • Managed operator state could be used over a CheckpointedFunction or ListCheckconservative interface. CheckpointedFunction defines snapshotState and initializeState. SnapshotState is called whenever checkpoint execution is performed. The initializeState method does this every time a user-defined function is initialized (The first initialization or previous checkpoint recover), which can be used not only to initialize state, but also to process the logic of State Recovery
  • For manageed operator State, only the list-style form is currently supported, that is, state is required to be serializable Objects list structure, which is easy to redistribute when rescale. Redistribution There are two modes of even-split redistribution(Each operator receives only the entire state of the sublist in restore/ Redistribution) and the Union “redistribution (Each operator receives a complete list of states in restore/ Redistribution)
  • FunctionSnapshotContext inherits the ManagedSnapshotContext interface, which defines the getCheckpointId and getCheckpointTimestamp methods. FunctionInitializationContext inherited ManagedInitializationContext interface, It defines the isRestored, getOperatorStateStore, getKeyedStateStore methods, which can be used to determine whether it was restored from the snapshot of the previous execution. And get OperatorStateStore and KeyedStateStore objects

doc

  • Using Managed Operator State