Fault recovery and consistency assurance

After a certain piece of data is delivered to a stream processing system, the system processes the data only Once, providing Exactly-Once guarantee is an ideal situation. If nothing goes wrong with the system, it’s perfect. However, in the real world, the system is often affected by various unexpected factors, such as traffic surge, network jitter, and cloud service resource allocation problems. If a failure occurs, Flink restarts the job, reads data from Checkpoint, restores the state, and performs the calculation again.

Flink’s State and Checkpoint mechanism:

  • Do stateful calculations in Flink
  • Flink Checkpoint mechanism

  1. The state of each operator in the job
  2. The Offset of input data is Offset

The process of replaying data is like watching a live game. Even if we miss some highlights, we can watch the Replay again from the video. The English word Replay can describe this scene very vividly. However, this caused a problem that data between time stamps 3 and 10 was retransmitted. Before the fault occurs, the data has been processed by some operators and may even have been sent to the external system. After the restart, the data is sent again. Instead of a piece of data being processed just Once, it may be processed multiple times. In terms of the accuracy of the results, we expect a piece of data to affect the final result only once. A system is said to provide end-to-end exact-once assurance if it can guarantee that a single piece of data affects the final result only Once.

The end-to-end exact-once problem is one of the most challenging problems in distributed systems, and many frameworks are trying to overcome it. In this regard, the consistency of Flink’s internal state mainly depends on Checkpoint mechanism, and the consistency of external interaction mainly depends on some functions provided by Source and Sink. Source needs to support retransmission function, Sink needs to adopt certain data write technology, such as idempotent write or transaction write.

For the Source retransmission function, as shown in the figure above, as long as we record the input Offset, the data sender can restart sending data from this Offset after the fault restarts. Kafka’s Producer not only sends data, but also persists data to log files. If the downstream application is restarted, the Producer locates the data from the persistent file based on the Offset provided by the downstream and can start sending data to the downstream again.

The retransmission of Source will result in one data being processed for many times. In order to ensure only one impact on the downstream system, Sink’s idempotent write or transaction write should also be relied on. These two concepts are highlighted below.

Power etc.

An Idempotent Write operation means that data is written to a system any number of times, with a single effect on the target system. For example, if you repeatedly insert the same key-value binary pair into a HashMap, the first time the HashMap changes, and subsequent insertions do not change the result of the HashMap, this is an idempotent write operation. Adding an integer repeatedly is not idempotent writing, because the integer becomes larger after multiple operations.

KV databases such as Cassandra, HBase and Redis are often used as sinks to achieve end-to-end exact-once. It is important to note that not all KV databases support idempotent writes. Idempotent writing has a requirement for KV pairs, that is, key-value must be Deterministic. If we design the Key as: name + curTimestamp, each time the data is retransmitted, the generated Key is different, resulting in multiple results, and the whole operation is not idempotent. Therefore, in order to pursue end-to-end exact-once, we should try to use deterministic computational logic and data model when designing business logic.

KV database as Sink may also encounter the phenomenon of time flashback. Let’s take the data retransmission as an example. Suppose that the data of timestamp 5 is calculated to produce a KV pair (a, t=5), and the data of timestamp 8 is calculated to produce a result (a, t=8). Different elements have an effect on the same Key. (a, t=5); (a, t=8); (a, t=5); (a, t=8); The result (a, t=8) should be stored in the database. Unfortunately, there was a faulty restart, and during the initial period after the restart, (a, t=5) was committed to the database again. The database mistakenly thought that this was the latest operation and that it would update the Key again, but in fact it reverted back to timestamp 5. Only after all subsequent data is retransmitted and all keys that should be overwritten are overwritten by the latest data, the entire system achieves data consistency. Therefore, from this point of view, the data in the KV database may be inconsistent during the restart process. Data consistency will be restored only after data retransmission is completed, and then it can provide end-to-end Exatcly-once guarantee.

Transactions are

Transaction is the core problem to be solved by database system. Flink draws on the transaction processing technology in the database and combines its own Checkpoint mechanism to ensure that Sink only affects the external output once.

In a nutshell, Transaction Write of Flink refers to that Flink saves the data to be output and does not commit it to the external system until the Checkpoint ends and all data of Flink’s upstream and downstream operators are consistent. Commit all previously saved data to an external system. In other words, only data that has been Checkpoint verified is written to the external system. In the case of data retransmission, as shown in the following figure, if transaction write is used, only the output before timestamp 3 is submitted to the external system. Data after timestamp 3 (for example, data generated by timestamp 5 and timestamp 8) is saved and written to the external system at the next Checkpoint. This prevents the timestamp 5 data from producing multiple results and being written to the external system multiple times.

In terms of the specific implementation of transaction Write, Flink currently provides Two ways: write-Ahead-log (WAL) and two-phase-commit (2PC). Flink ADAPTS these two protocols to its own scenarios, which are often used by many databases and distributed systems to implement transactions. The main differences between the two methods are as follows: WAL is more versatile and suitable for almost all external systems, but it cannot provide 100% end-to-end Exactly-Once. If the external system itself supports transactions (such as Kafka), you can use the 2PC approach to provide 100% end-to-end exact-once. We’ll cover both in more detail in the next article.

Transaction writes provide end-to-end exact-once consistency at a significant cost in terms of latency. Output data is no longer written to external systems in real time, but is submitted in batches. Currently, there is no perfect fail-over and exact-once guarantee mechanism, and it is a trade-off between different requirements for developers.