Checkpoint is the core mechanism of Flink’s fault tolerance. It can periodically store snapshots of the data processed by each Operator. If the Flink program goes down, data can be recovered from these snapshots.

I. Consistency

In stream processing, consistency can be divided into three levels:

At-most-once: This is a euphemism for having no guarantee of correctness — the count may be lost after a failure occurs.

At-least-once: This means that the result of the count may be greater than the correct value, but never less than the correct value. In other words, the counting program may overcount after a failure, but it never undercounts.

Exactly -once: This means that the system guarantees that the count will be the same as the correct value in the event of a failure. Neither too much nor too little.

At-least-once: The first generation of streaming processors (such as Storm and Samza) only guaranteed at-least-once when they first came out, and the exact-once system was more complex to implement. This is challenging both at the infrastructure level (deciding what is true and what the exact-once range is) and at the implementation level. Early users of flow processing systems are willing to accept the limitations of the framework and find ways to compensate at the application level (such as making the application idempotent, or doing the calculations again with the batch computing layer).

End-to-end state consistency

The specific division is as follows:

The source end

External sources are required to reset the read location of the data. The Kafka Source we are currently using has this feature: offset can be specified when reading data

Flink internal

Checkpoint dependence

The sink end

Ensure that data is not repeatedly written to the external system during fault recovery.

There are two implementations:

An Idempotent operation is a potent operation that can be repeated many times but only causes a single change in the result, i.e., it does not work if it is repeated later.

B) Transactional writing

Checkpoint Principle

The “checkpoint” feature resets the system to a correct state in the event of a failure.

The checkpoint mechanism is the cornerstone of Flink reliability. When an operator fails due to some reasons (such as abnormal exit), the Flink cluster can restore the status of the entire application flow graph to a certain state before the failure, ensuring the consistency of the application flow graph status.

Snapshot implementation algorithm:

A) Simple algorithm – pause the application, start doing checkpoints, and then resume the application

B)Flink’s improved Checkpoint algorithm. Flink’s checkpoint mechanism is based on a variant of the Chandy-Lamport algorithm: asynchronous barrier snapshotting.

When each application that requires checkpoint starts, the JobManager of Flink creates a CheckpointCoordinator for the application. The CheckpointCoordinator is responsible for creating snapshots for the application.

Barrier

The barrier of streams is a core concept in Flink’s Checkpoint. Multiple barriers are inserted into the data stream and then flow with the data as part of the data stream (somewhat similar to Watermark). These barriers do not span data in the stream.

One of the core elements of Flink distributed snapshot is the stream barrier. These barriers are inserted into the data stream and flow with the data as part of its flooding. The barrier does not hold any data, but flows as linearly as the data. As you can see, the barrier divides the data flow into two pieces of data (actually consecutive pieces), one for the current snapshot and one for the next snapshot. Each barrier carries its snapshot ID. The snapshot data is in front of the barrier. In the figure, data moves from left to right (the one on the right enters the system first). Therefore, snapshot N contains data that reaches the end of the next barrier (n-1) on the right. The part between the two gray vertical lines is part of checkpoint N. In addition, the barrier does not interrupt the flow of data, so the barrier is very lightweight. Multiple snapshots can be in the same data flow at the same time, which means multiple snapshots can be created at the same time.

Flow: If there are multiple input data flows, barriers for multiple data flows are inserted into the data flow at the same time. The barrier of snapshot N is inserted into the point of the data stream (we call Sn), which is a point in the data stream up to that point (containing all data prior to the current moment), that is, the snapshot of the contained data.

Then the barrier begins to flow downward. When an intermediate operator receives snapshot N barriers for all of its input sources, it sends a snapshot N barrier to all of its output streams, and once a sink operator receives snapshot N barriers for all of its input streams, it sends snapshot N confirmation to the checkpoint coordinator. The system considers the current snapshot data complete only when all sinks confirm snapshot N. Once snapshot N is executed, the task does not request data before snapshot N because the data has passed the data flow topology.

Checkpoint Coordinator trigger Checkpoint on all source nodes. The Source Task then places CheckPoint barriers in the data stream

Step 2: The source node broadcasts barriers to the downstream, which is the core of chandy-Lamport distributed snapshot algorithm. The downstream task Checkpoint only when it receives all the input barriers

Step 3: After the task completes state backup, the task notifies the Checkpoint coordinator of the state handle of the backup data.

Step 4: After the downstream sink node collects all the upstream two input barriers, it will perform local snapshot. The RocksDB incremental Checkpoint process is specially shown here. First, RocksDB will flush all data to disk (represented by the big red triangle). The Flink framework then selects the unuploaded files for a persistent backup (purple triangle).

Step 5: Similarly, after the sink node completes its Checkpoint, it returns the state Handle to the Coordinator.

Step 6: Finally, when the Checkpoint coordinator collects the state Handle of all tasks, the Checkpoint meta-file is considered to be globally completed and backed up to the persistent storage.

Original link: blog.csdn.net/weixin\_427…

Flink deals with Barrier in two ways:

  1. The barrier to align

  2. Barriers do not align with corresponding classes

Code: blog.csdn.net/jsjsjs1789/…

The Savepoint principle

Flink Savepoint is a global mirror of real-time tasks. The underlying code of Flink Savepoint is the same as that of Checkpoint because Savepoint is a snapshot of Checkpoint status at a specific time.

In principle, savepoints are created using exactly the same algorithm as checkpoints, so savepoints can be considered checkpoints with some extra metadata. Flink does not automatically create savepoints. So the user (or external scheduler) must explicitly trigger the create action savepoint, which is a powerful feature. In addition to failover, savepoints can be used for planned manual backups, updating applications, version migration, pausing and restarting applications, and so on

Flink Savepoint trigger mode

There are three Flink Savepoint trigger modes:

1. Run the flink savepoint command to trigger savepoint, which triggers savepoint when the program is running.

2. Run the flink cancel -s command to cancel the job and trigger Savepoint.

3. Use Rest APIS to trigger savepoints in the following format: **/jobs/:jobid /savepoints**

Flink Savepoint

1. If the flink cancel -s command is used to cancel a job and Savepoint is triggered at the same time, a problem occurs. For example, the real-time application is abnormal (such as Checkpoint failure), and you stop operation and trigger Savepoint. This time, Savepoint fails. In this case, you can see that the real-time operation has been stopped on the real-time platform, but the actual real-time operation is still running on Yarn. In this case, you need to catch the exception that triggers the Savepoint failure. When an exception is thrown, you can Kill the task on Yarn.

2. When using DataStream, it is best to assign uid to each operator, so that if the job topology changes, the operator can be restored to its previous state. By default, Flink assigns UID to each operator, in this case, when you change some logic of the program, May cause the operator’s ‘UID’ to change, so the previous state data, can not be reused, the program at the start of the time, will report an error.

3. As Savepoint is the global state of the program, for some real-time tasks with large status, when Savepoint is triggered, it may affect the running real-time tasks. I suggest that Savepoint should not be triggered too frequently for real-time tasks with large status. Set the trigger time appropriately based on the size of the state.

4. When we restore from Savepoint, we need to check whether the Savepoint directory file is available this time. If you failed to trigger Savepoint last time, the Savepoint file in the HDFS directory may be unavailable or data files are missing. In this case, if you restore the status of the damaged Savepoint in the status directory, the task cannot be started.

Difference between checkpoint and SavePoint

Savepoint: A Savepoint is triggered by a command, created and deleted by the user, stored in a standard format store, and the version of the job can be upgraded and its configuration can be changed. The user must provide the path to the savepoint used to restore job state.

Checkpoint: The Checkpoint is saved in the external path specified by the user. Flink automatically triggers the Checkpoint. When the job fails or is canceled, the Checkpoint of the external storage is retained. The user must provide the path to the checkpoint used to restore job state.

Flink serialization

Java serialization

Java’s serialization mechanism typically implements the Serializable interface and specifies the serialVersionUID. Serialization and deserialization are realized by means of byte stream. SerialVersionUID is used as version control and will fail to deserialize if it changes. Main uses:

  • Used for network transmission
  • Object deep copy
  • Disadvantages of storing objects:
  1. Inability to cross languages
  2. The serialized stream is too large
  3. Poor serialization performance

Serialization of Flink

Flink implements its own serialization framework and combines its own memory model to realize intensive storage and efficient operation of objects.

  • As you can see, this serialization approach has a fairly compact storage density. The PojoSerializer only serializes the header and enforces the serializer corresponding to each field to serialize the field.
  • Memory pool A data structure containing memorysegments that store keys + Pointers (Pointers to complete binary data and serialized keys of fixed length) and binary data of objects

  • The benefits of using memory pools to manage memory and using binaries to store data:
  1. Avoid OOM. All runtime data structures and algorithms can only apply for memory through the memory pool, ensuring that the memory size used by them is fixed and will not occur OOM due to runtime data structures and algorithms. When memory is tight, algorithms (sort/ Join, etc.) efficiently write a large chunk of memory to disk and then read back. Therefore, OutOfMemoryErrors can be effectively avoided.
  2. Saving memory space, Java objects have a lot of extra storage overhead that can be avoided by using binaries.
  3. Efficient binary operations & cache-friendly computations. First, swapping fixed-length blocks (key+pointer) is more efficient without exchanging real data or moving other keys and Pointers. Note that using key+pointer can greatly improve the hit ratio of cache L1,L2, and L3. Note that using key+pointer can greatly improve the hit ratio of cache L1,L2, and L3. In Flink, sorting is done by comparing the size to the key first, so you can compare the binary key directly without deserializing the entire object. Because the key is fixed length, if the key is the same (or no binary key is provided), the real binary data must be deserialized and compared. After that, just swap key+pointer to get the sort effect, the real data does not move.