I. State consistency

When state is introduced in distributed systems, the problem of consistency is naturally introduced. Consistency is really just another way of saying “correctness level,” which means how correct are the results after a failure is successfully handled and recovered compared to the results without any failures? For example, suppose you want to count users who logged in in the last hour. What is the count after the system experiences a failure? If there is a deviation, is there a missing count or a double count?

  • Stateful flow processing, inside each operator task can have its own state
  • In terms of internal flow processing, the so-called state consistency is actually what we call the guarantee of accurate calculation results.
  • A single piece of data should not be lost, nor should it be double-counted
  • In the event of a failure, the state can be restored and the result should be completely correct after the recalculation.

1.1 Consistency Level

In stream processing, consistency can be divided into three levels:

  • At-most-once: This is a euphemism for having no guarantee of correctness — the count may be lost after a failure occurs. The same goes for UDP.
  • At-least-once: This means that the result of the count may be greater than the correct value, but never less than the correct value. In other words, the counting program may overcount after a failure, but it never undercounts.
  • Exactly -once: This means that the system guarantees that the count will match the correct value in the event of a failure. Processing exactly once is the strictest guarantee, and the hardest to achieve.

1.2 Differences among the three levels

At-least-once was very popular. The first generation of streaming processors (such as Storm and Samza) only guaranteed at-least-once when they first came out, for two reasons.

  • The system that ensures exactly-once is more complex to implement. This is challenging at both the infrastructure level (determining what represents true and what the exact-once scope is) and the implementation level.
  • Early users of streaming systems were willing to accept the limitations of the framework and find ways to compensate at the application level (such as making the application idempotent, or doing the calculations again with the batch computing layer).

The first systems to ensure exactly once (Storm Trident and Spark Streaming) pay a heavy price in terms of performance and performance. To ensure exact-once, these systems do not apply application logic to each record individually, but instead process multiple records (batches) simultaneously, ensuring that each batch will either all succeed or all fail.

This results in having to wait for a batch of records to finish processing before getting results. As a result, users often have to use two flow processing frameworks (one to ensure exactly once and the other to do low-latency processing for each element), resulting in further infrastructure complexity.

At one time, users had to weigh the pros and cons of ensuring exactly-once against achieving low latency and efficiency. Flink avoids this trade-off.

This is a significant value of Flink, as it not only guarantees exactly-once, but also has low latency and high throughput processing capabilities.

Basically, Flink avoids trade-offs by making itself available to all needs, and it is a significant technological leap forward for the industry. While this may seem amazing to the uninitiated, once you understand it, you’ll suddenly understand it.

2. Checkpoints of Consistency (Cpus)

  • Flink uses a lightweight snapshot mechanism —- checkpoint to ensure exact-once semantics
  • A consistent checkpoint for stateful applications is essentially a copy (a snapshot) of the state of all tasks at some point in time. This point in time should be when all tasks have just finished processing the same input data.
  • Consistent checkpoints of application state are at the heart of Flink’s fault recovery mechanism

3. End-to-end state consistency

  • At present, the consistency guarantee we see is implemented by the stream processor, that is, it is guaranteed within the Flink stream processor; In a real application, a flow processing application contains a data source (such as Kafka) and output to a persistent system in addition to the flow processor.
  • End-to-end consistency guarantee means that the correctness of the results runs through the whole flow processing application. Each component guarantees its own consistency
  • The overall end-to-end consistency level depends on the least consistent component of all.

4. End-to-end Ecactly -once guarantee

We know that end-to-end state consistency depends on the weakest link of all its components, which is the classic barrel theory.

It can be divided as follows:

  • Internal assurance — checkpoint dependence
  • Source side – External sources are required to reset the data read location
  • Sink side – You need to ensure that data is not repeatedly written to the external system during recovery from failure

For the sink side, there are two specific implementations: Idempotent writes and Transactional writes.

4.1 Idempotent Writes (Idempotent Writes)

An idempotent operation is an operation that can be repeated many times but only causes a change in the result. In other words, repeated operations do not work.

Write insertions in a Hashmap are idempotent operations that repeat writes with the same result.

4.2 Transaction Write

Transactions need to be constructed to write to the external system. The constructed transactions correspond to checkpoint. When checkpoint is completed, all corresponding results will be written to the sink system.

Transaction

  • A series of rigorous operations in an application, all of which must complete successfully or all changes made in each operation will be undone
  • Atomic: a sequence of operations in a transaction either all succeed or none

Implementation idea: constructed transactions correspond to checkpoint, and when checkpoint is actually completed, all corresponding results will be written into the sink system

For transactional writes, there are two implementations: write-ahead logging (WAL) and two-phase commit (2PC). DataStream API provides GenericWriteAheadSink template class and TwoPhaseCommitSinkFunction interface, can easily achieve this transactional writes in one of two ways.

4.2.1 Write-ahead-log (WAL)

  • The result data is first saved as status and then written to the sink system once the checkpoint completion notification is received
  • It is simple and easy to implement. Since the data is cached in the state backend in advance, no matter what sink system is used, it can be done in batches in this way
  • The DataStream API provides a template class, GenericWriteAheadSink, to implement this transactional sink

4.2.2 Two-Phase-commit (2PC)

  • For each checkpoint, the sink task starts a transaction and adds any incoming data to the transaction
  • Then write this data to the external sink system, but do not commit them —– this is just “pre-commit”
  • When it receives notification that checkpoint has completed, it formally commits the transaction and actually writes the result
  • This approach truly implements exactly-once, which requires an external sink system that provides transaction support. Flink provides TwoPhaseCommitSinkFunction interface.

4.2.3 2PC’s requirements for the external Sink System

  • The external sink system must provide transaction support, or the Sink task must be able to simulate transactions on the external system
  • During the checkpoint interval, it must be possible to open a transaction and accept data writes
  • Transactions must be in the “pending commit” state until they are notified that checkpoint has completed. In the case of failover, this may take some time. If the sink system closes the transaction at this time (column timeout), the uncommitted data will be lost
  • The Sink task must be able to resume transactions if the process fails
  • Commit transactions must be idempotent operations

4.2.4 Consistency of different sources and sinks

Five, Flink+Kafka end-to-end state consistency guarantee

For Flink + Kafka’s data pipeline system (Kafka in, Kafka out), how can each component ensure exactly once semantics?

  • Internal: The checkpoint mechanism is used to store status on disks. When a fault occurs, data can be recovered to ensure internal status consistency
  • Source — Kafka Consumer, as the source, can save the offset. If a subsequent task fails, the connector can reset the offset and re-consume data during recovery to ensure consistency
  • Sink, kafka producer as a sink, use two-phase commit sink, need to implement a TwoPhaseCommitSinkFunction

We already know the internal checkpoint mechanism, but how do source and sink work? Let’s do an analysis step by step.

5.1 Exactly-once Two-stage submission

As we know, JobManager coordinates various TaskManagers to checkpoint storage in Flink. Checkpoint storage is stored in StateBackend. By default, StateBackend is memory level. You can also change to file-level persistence. * * * *

When checkpoint starts, JobManager injects the barrier into the data stream. Barriers are passed between operators. * * * *

Each operator takes a snapshot of the current state and saves it to the state back end. For the Source task, the current offset is saved as the state. The next time the source task recovers from checkpoint, it can recommit the offset and re-consume the data from where it was last saved. * * * *

Every internal transform task that encounters a Barrier stores the state to its checkpoint. The Sink task first writes data to the external Kafka, which is a pre-committed transaction (not yet consumed). When a Barrier is encountered, the state is saved to the state back end and a new pre-committed transaction is opened.

When the checkpoint is complete, JobManager sends a notification to all tasks to confirm that the checkpoint is complete.

When the Sink task receives an acknowledgement, it will formally commit the previous transaction. The unacknowledged data in Kafka is changed to “confirmed” and the data can actually be consumed.

Therefore, we can see that the execution process is actually a two-stage submission. After the execution of each operator is completed, “pre-submission” will be carried out until the sink operation is completed, and “confirmation submission” will be initiated. If the execution fails, the withholding will be abandoned.

5.2 Two-phase Submission Procedure Summary

The specific two-phase commit steps are summarized as follows:

  • After the first data comes in, a Kafka transaction is started and written to the Kafka partition log normally but marked as uncommitted. This is called “pre-committed”.
  • The JobManager triggers the checkpoint operation, and the barrier passes down from the source. The barrier operator stores the status to the status backend and notifies the JobManager
  • The Sink connector receives the barrier, saves the current state, stores it to checkpoint, notifies jobManager, and starts the transaction of the next stage for submitting the data of the next checkpoint
  • The JobManager receives notification of all tasks and sends a confirmation message indicating that checkpoint completion is complete
  • The Sink task receives the confirmation message from jobManager and formally submits the data during this period
  • The external Kafka closes the transaction and the submitted data can be consumed normally.

As you can see, if you need to use StateBackend to recover from an outage, you can only recover all confirmed committed operations. I will talk to you about this later

Today is the first to learn here, people that good night ( ̄o ̄). Z z

Phase to recommend

Hadoop data warehouse construction practice

Introduction to dry | Flink CEP technology

Dry goods | Apache Flink portal technology PPT to share

Share | double tenth Kafka Flink + + Redis electricity screen real-time calculation examples

Flink: Providing a Powerful Fault Tolerance Feature