In the past, whether you were using Apache Flink in production or investigating it, you probably asked one question: How do I access and update the state saved in the Flink savepoint? No questions asked, Apache Flink 1.9.0 introduces the State Processor API, a powerful extension to the DataSet API that allows states in Flink’s savepoints and checkpoints to be read, written and modified.

In this article, we will explain why this feature is important to Flink, as well as its purpose and usage. Finally, we’ll discuss future plans for the state processor API, in keeping with the overall future plans for Flink batch stream unification.

State flow as of Apache Flink 1.9 handles the status quo

Almost all complex flow processing applications are stateful, and most are designed to run for months or even years. Over time, these operations accumulate a lot of valuable state that can become costly or impossible to reconstruct if lost due to failures. To ensure consistency and persistence of application state, Flink designed a sophisticated set of checkpoints and recovery mechanisms from the outset. With each release, the Flink community adds more and more state-related features to speed up checkpoint execution and recovery and improve application maintenance and management.

However, Flink users often request the ability to access the state of an application “externally.” The motivation for this requirement may be to validate or debug the state of the application, or to migrate the state of the application to another application, or to import the initial state of the application from an external system such as a relational database.

While the starting point of these requirements is sound, the ability to access the application’s state externally has been fairly limited so far. The queryable state function of Flink supports key-based lookup (point-to-point query), does not guarantee consistency of the returned values (which may vary before and after an application is recovered), and the queryable state is read, not modified, and cannot be written. In addition, a consistent snapshot of state: savepoint is also inaccessible because it is encoded using a custom binary format.

Use the State Processor API to read and write application State

The introduction of the state handler API in Flink1.9 really changes this, enabling manipulation of application state. This feature uses the DataSet API to extend input and output formats to read and write savepoint or checkpoint data. Due to the interoperability of the DataSet and Table APIS, users can even use relational Table apis or SQL queries to analyze and process state data.

For example, a user can create a savepoint of a running flow processing application and analyze it using a batch processor to verify that the application is behaving correctly. Alternatively, the user can arbitrarily read, process, and write data to a savepoint and use it to stream the initial state of the application. It is also now possible to fix entries with inconsistent states in savepoints. API opens up many methods in the end, the state of the processor to develop stateful applications, in order to ensure that can restore to bypass before and do a lot of restrictions: users can now be edited state the type of data, adjust the operator maximum parallelism, split or merge operator status, UID reassign operators and so on.

Map the application to the data set

The state processor API maps the state of a stream application to one or more data sets that can be processed separately. To be able to use the API, you need to understand how this mapping works.

First, let’s look at what a stateful Flink job looks like. Flink job consists of operators, usually one or more source operators, some operators for data processing and one or more sink operators. Each operator runs in parallel in one or more tasks, and can use different types of state: it can have zero, one or more operator States in the form of lists, whose scope is the current operator instance; If these operators are applied to keyed streams, it may also have zero, one or more Keyed states whose scope is the key extracted from each processing record. You can think of keyed States as distributed key-value mappings.

The application “MyApp” shown below consists of three operators called “Src”, “Proc” and “Snk”. Src has an operator state (os1), Proc has an operator state (os2) and two keyed states (Ks1, ks2), while Snk is stateless.





MyApp savepoints, or checkpoints, consist of data for all states, organized in a way that restores the state of each task. When working with savepoint (or checkpoint) data using batch jobs, we need to map the data for each task state to a dataset or table in mind. Because we can actually think of savepoints as databases. Each operator, identified by its UID, represents a namespace. Each operator state of the operator is injected into a single-column, special-purpose table in the namespace that holds state data for all tasks. All keyed states of the operator are mapped to a key-value multilist consisting of a list of keys and a list of values mapped to each key state. How do MyApp savepoints map to the database





The figure shows how the value of operator state for “Src” maps to a table with one column and five rows, with one row representing a parallel instance of all parallel tasks for Src. Similarly, the operator state os2 of “Proc” maps to a single table. For keyed state, ks1 and Ks2 are combined into a single table with three columns, one for primary key, one for Ks1, and one for KS2. This table holds one row for each different key of the two keyed states. Since “Snk” has no state, its mapping table is empty.

The state handler API provides methods for creating, loading, and writing savepoints. The user can either read a dataset from a loaded savepoint or convert the dataset to a state and add it to a savepoint. In short, these datasets can be processed using the full feature set of the DataSet API. With these approaches, you can solve all of the aforementioned use cases (and more). If you want to learn more about how to use the state handler API, check out the documentation.

Why use the DataSet API?

If you are familiar with Flink’s future plans, you may be surprised to learn that the state processor API is based on the DataSet API, as the Flink community currently plans to extend the DataStream API with the BoundedStreams concept and deprecate the DataSet API. However, when designing this state processor capability, we also evaluated the DataStream API and the Table API, and neither of them provided the corresponding functionality. Not wanting to hinder the development of this feature, we decided to build it on the DataSet API and minimize its dependence on the DataSet API. Based on this, it should be fairly easy to migrate it to another API.

conclusion

Flink users have long had the need to access and modify the state of a flow application externally. With the help of the state processor API, Flink opens up many new possibilities for how users maintain and manage flow applications, including arbitrary evolution of flow applications and the export and bootstrapping of application state. In short, the savepoint of the state processor API is no longer a black box.





The original link

This article is the content of Ali Cloud, shall not be reproduced without permission.