Hi, everyone. Today I’m going to share with you Flink’s Checkpoint, which is divided into four parts. Checkpoint: Checkpoint: Checkpoint: Checkpoint: Checkpoint: Checkpoint: Checkpoint: Checkpoint: Checkpoint: Checkpoint: Checkpoint

Relationship between Checkpoint and State

A Checkpoint is a global operation triggered from a source to all downstream nodes. In the red box, you can see that 569K Checkpoint attempts have been made, and all of them have been successfully completed.





State is the main data stored in the persistent backup performed by Checkpoint. The state is 9kb.





What is the state

So let’s see what state is. Let’s take a look at a very classic Word count code, this code will monitor the local 9000 port data and the network port input word frequency statistics, we local action netcat, and then enter hello world on the terminal, the execution program output what?





The answer is pretty obvious, (Hello,1) and (word,1)

So the question is, if I type Hello world again on the terminal, what does it say?

The answer is pretty obvious, (hello, 2) and (world, 2). The reason why Flink knows that hello world has been processed once before is that state comes into play. Keyed state stores the data that needs to be counted before, so it helps Flink know that Hello and world have appeared once respectively.

Let’s go back to the word count code. A call to the keyby interface creates a Keyed stream to partition keys, which is a prerequisite for using keyed state. After that, the sum method calls the built-in StreamGroupedReduce implementation.





What is a keyed state

For keyed state, there are two characteristics:

  • It can be used only in KeyedStream functions and operations, such as Keyed UDF and window state
  • The keyed state is partitioned. Each key can belong to only one keyed state
To understand the concept of partitioned, we need to look at the semantics of keyby. You can see that there are three concurrent entries on the left, three concurrent entries on the right, and the words on the left are distributed through Keyby as they come in. For example, in the case of Hello Word, the word “hello” will always hash to the concurrent task in the lower right.





What is operator State

  • Also known as non-keyed state, each operator state is bound to only one instance of an operator.
  • A common operator state is source state, such as recording the offset of the current source
Let’s look at the word count code that uses operator state:





Here fromElements calls the class of FromElementsFunction, which uses the operator state of type List State. Make a classification according to state type as shown in the figure below:





In addition to this classification point of view, there is also a classification point of view from whether Flink directly takes over:

  • Managed State: Managed State Managed by Flink. All the preceding examples are Managed states
  • Raw State: Flink only provides streams for storing data. For Flink, Raw State is just bytes
In actual production, only managed State is recommended, and this article will discuss this topic.

How do I use state in Flink

The StreamGroupedReduce class is used for word count sum.





The following figure dissects the FromElementsFunction class in the Word Count example and shares how to use operator State in your code:





Checkpoint execution mechanism

Before introducing the Checkpoint execution mechanism, we need to look at the storage of state, because state is Checkpoint’s primary role in performing persistent backups.

The classification of Statebackend

The following figure illustrates three types of state Backend built into Flink. MemoryStateBackend and FsStateBackend are stored in the Java heap when they are running. FsStateBackend persists data to remote storage in file format. RocksDBStateBackend borrows RocksDB (LSM DB mixed with memory disks) to store state.





For HeapKeyedStateBackend, there are two implementations:

  • Supports asynchronous Checkpoint (default) : Storage format CopyOnWriteStateMap
  • Only Checkpoint: the storage format is NestedStateMap
Especially when HeapKeyedStateBackend is used in MemoryStateBackend, the Checkpoint serialization data stage has a maximum limit of 5 MB by default

For RocksDBKeyedStateBackend, each state is stored in a separate column family, where keyGroup, Key, and Namespace are serialized and stored in DB as keys.





Checkpoint execution mechanism explained

This section explains the Checkpoint execution process step by step. The Checkpoint Coordinator on the left is the initiator of the Checkpoint. In the middle is a Flink job consisting of two sources and one sink. On the far right is persistent storage, which in most user scenarios corresponds to HDFS.

  1. The Checkpoint Coordinator triggers a Checkpoint on all source nodes. .




  1. In the second step, the source node broadcasts barriers to the downstream, which is the core of chandy-Lamport distributed snapshot algorithm. The downstream task performs Checkpoint only after receiving all the input barriers.




  1. Third, after the task completes state backup, the task notifies the Checkpoint coordinator of the state handle of the backup data.




  1. Step 4. After the downstream sink node collects the two upstream input barriers, it will perform local snapshot. The RocksDB incremental Checkpoint process is specifically shown here. RocksDB first flusher the data to disk in full (big red triangle), and Flink framework then selects the unuploaded files for a persistent backup (purple triangle).




  1. Similarly, after the sink node completes its Checkpoint, it returns the state Handle to the Coordinator.




  1. Finally, when the Checkpoint coordinator collects the state Handle of all tasks, the Checkpoint meta-file is considered to be globally completed and backed up to the persistent storage.




Checkpoint EXACTLY_ONCE semantics

To implement EXACTLY ONCE semantics, Flink uses an input buffer to cache data received during the alignment phase and wait for it to be processed ONCE alignment is complete. For AT LEAST ONCE semantics, the collected data does not need to be cached and will be directly processed later. As a result, the data may be processed multiple times when restore is implemented. Check out the documentation on Checkpoint Align below:





It should be noted that the Checkpoint mechanism of Flink can only ensure that the calculation process of Flink can achieve EXACTLY ONCE, and the end-to-end EXACTLY ONCE needs the support of source and sink.

Difference between Savepoint and Checkpoint

During job recovery, both can be used. The main differences are as follows:

SavepointExternalized Checkpoint Triggered by the user and managed by the user to create and delete the Checkpoint. When Checkpoint is completed, the standardized format storage is saved in the external persistent storage given by the user. Allow job upgrades or configuration changes When a job FAILED (or CANCELED), The Checkpoint of the external storage is reserved. During job recovery, the user needs to provide the savepoint path used to restore job status

Tang Yun (Tea Dried)

The original link

This article is the original content of the cloud habitat community, shall not be reproduced without permission.