preface

Following the basic use of Flink in the previous chapter, this chapter is mainly about how to ensure the preservation of some information of flink when the program is running, and how to recover after errors or anomalies. For example, when consuming Kafka, you need to record the location to which the consumption was made, and then restart to continue the consumption from the previous location to ensure the exact-once semantics. Or a window that computs a lot of data suddenly hangs up and restarts again to ensure that the previous statistics are still available.

1.state

1.1 StSTE official website Description

Stateful functions and operators store data during the processing of individual elements/events, making state a key building block for any type of finer manipulation.

Such as:

  • When the application searches for certain event patterns, the state stores the sequence of events encountered so far.
  • When events are summarized per minute/hour/day, the status is reserved for the summary to be processed.
  • When a machine learning model is trained on a stream of data points, the state retains the current version of the model parameters.
  • This state allows efficient access to past events when historical data needs to be managed.

1.2 Type of state

It generally refers to the status of a specific task/operator. State can be recorded and stored, allowing data to be recovered in the event of a failure.

1.2.1 Keyed State

In simple terms, the Keyed State that has passed the shuffle process is reflected in the code after keyBy is used.

1.2.2 Operator State,

Operator State is Operator State without shuffle.

Managed state: The state managed by the Flink framework, which is what we usually use.

Raw status: The user manages the data structure of the status by himself. During checkpoint checkpoint, byte[] is used to read and write status content, but the internal data structure is not known. It is generally recommended to use the managed state on DataStream, but the original state is used when implementing a user-defined operator. But it’s not usually used.

1.2.3 State sample diagram and code demonstration

Word count demonstration

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> localhost = environment.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = localhost.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                for (String s : value.split(",")) {
                    out.collect(new Tuple2<>(s, 1));
                }
            }
        }).keyBy(0).sum(1);

        sum.print();

        environment.execute("WordCount"); }}Copy the code
If you enter Flink for the first time, the Hadoop program displays the result 13> (Flink,1) 15> (Hadoop,1). If you enter Flink for the second time, the Hadoop program displays the result 2> (hive,1) 13> (Flink,2) 15> (hadoop,2)Copy the code

Conclusion: As you can see, the number of words is accumulated. If you have used Spark, you will know that the same logic does not accumulate in Spark.

1.3 Keyed State Demo

1.3.1 ValueState

import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; /** * the same key, each receive 3 data, */ public class ValueStateDemo {public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Tuple2<String, Integer>>source =
                env.fromElements(Tuple2.of("A", 2), Tuple2.of("A", 3), Tuple2.of("A", 4),
                        Tuple2.of("B", 5), Tuple2.of("B", 7), Tuple2.of("B", 9), Tuple2.of("C", 1));


        source.keyBy(0)
                .flatMap(new ValueStateSum())
                .print();

        env.execute("ValueStateDemo"); } } class ValueStateSum extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {private ValueState<Tuple2<Integer, private ValueState<Tuple2<Integer, Integer>> valueStateSun; */ @override public void open(Configuration parameters) throws Exception {// Register status ValueStateDescriptor<Tuple2<Integer, Integer>> descriptor = new ValueStateDescriptor<Tuple2<Integer, Integer>>("valueStateName", // State name types.tuple (types.string, types.int)); ValueStateSun = getRuntimeContext().getState(Descriptor); } @Override public void flatMap(Tuple2<String, Integer> element, Collector<Tuple2<String, Tuple2<Integer, Integer> currentState = Valuestatesun.value (); // The first time it might be emptyif(currentState == null) { currentState = Tuple2.of(0, 0); } currentState.f0 += 1; // Update total currentState.f1 += element.f1; Valuestatesun.update (currentState);if(currentState.f0 >= 3) {out.collect(tuple2.of (element.f0, currentState.f1)); Valuestatesun.clear (); }}}Copy the code
The result can be seen as output according to the requirements, so you can do your own personalized processing according to the requirements. Here, C does not meet three, so there is no output. 3> (B,21) 14> (A,9)Copy the code

1.3.2 ListState

import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.collect.Lists; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.util.Collections; import java.util.List; /** * the same key, each receive 3 data, Public ListStateDemo {public static void main(String[] args) throws Exception {public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Tuple2<String, Integer>>source =
                env.fromElements(Tuple2.of("A", 2), Tuple2.of("A", 3), Tuple2.of("A", 4),
                        Tuple2.of("B", 5), Tuple2.of("B", 7), Tuple2.of("B", 9), Tuple2.of("C", 1));


        source.keyBy(0)
                .flatMap(new ListStateSum())
                .print();

        env.execute("ListStateDemo"); } } class ListStateSum extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {private ListState<Integer> ListState; */ @override public void open(Configuration parameters) throws Exception {// Register status ListStateDescriptor<Integer> descriptor = new ListStateDescriptor<Integer>("listState", Integer.class); listState = getRuntimeContext().getListState(descriptor); } @Override public void flatMap(Tuple2<String, Integer> element, Collector<Tuple2<String, Integer>> out) throws Exception {// Get the current state Iterable<Integer> integers = listState.get(); // The first time it might be emptyif(integers == null) { listState.addAll(Collections.EMPTY_LIST); } // Insert an element listState.add(element.f1); List<Integer> List = Lists. NewArrayList (listState.get()));if(list.size() >= 3) { Integer sum = list.stream().reduce((a, b) -> a + b).get(); Out.collect (tuple2. of(element.f0, sum)); // Clear the state value liststate.clear (); }}}Copy the code
14> (A,9) 3> (B,21)Copy the code

1.3.3 mapState

import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.collect.Lists; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.util.List; import java.util.UUID; /** * the same key, each receive 3 data, Public class MapStateDemo {public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Tuple2<String, Integer>>source =
                env.fromElements(Tuple2.of("A", 2), Tuple2.of("A", 3), Tuple2.of("A", 4),
                        Tuple2.of("B", 5), Tuple2.of("B", 7), Tuple2.of("B", 9), Tuple2.of("C", 1));


        source.keyBy(0)
                .flatMap(new MapStateSum())
                .print();

        env.execute("MapStateDemo"); } } class MapStateSum extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> { private MapState<String,Integer> mapState; */ @override public void open(Configuration parameters) throws Exception {// Register status MapStateDescriptor<String,Integer> descriptor = new MapStateDescriptor<String, Integer>("mapState", String.class,Integer.class); mapState = getRuntimeContext().getMapState(descriptor); } @Override public void flatMap(Tuple2<String, Integer> element, Collector<Tuple2<String, Integer>> out) throws Exception {// Add the key as the value mapState.put(UUID.randomUUID().toString(),element.f1); Iterable<Integer> values = mapState.values(); List<Integer> list = Lists.newArrayList(values);if(list.size() >= 3) { Integer sum = list.stream().reduce((a, b) -> a + b).get(); Out.collect (tuple2. of(element.f0, sum)); // Clear the state value mapstate.clear (); }}}Copy the code
Results 3> (B,21) 14> (A,9)Copy the code

1.3.4 ReducingState

/** * Select * from key; */ Public Class ReducingStateDemo {public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Tuple2<String, Integer>>source =
                env.fromElements(Tuple2.of("A", 2), Tuple2.of("A", 3), Tuple2.of("A", 4),
                        Tuple2.of("B", 5), Tuple2.of("B", 7), Tuple2.of("B", 9), Tuple2.of("C", 1));


        source.keyBy(0)
                .flatMap(new ReducingStateSum())
                .print();

        env.execute("ReducingStateDemo"); } } class ReducingStateSum extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> { private ReducingState<Integer> reducingState; */ @override public void open(Configuration parameters) throws Exception {// Register status ReducingStateDescriptor<Integer> descriptor = new ReducingStateDescriptor<Integer>("reducingState", new ReduceFunction<Integer>(){@override public Integer reduce(Integer value1, Integer value2) throws Exception {returnvalue1 + value2; } }, Integer.class); reducingState = getRuntimeContext().getReducingState(descriptor); } @Override public void flatMap(Tuple2<String, Integer> element, Collector<Tuple2<String, Integer>> out) throws Exception { reducingState.add(element.f1); out.collect(Tuple2.of(element.f0,reducingState.get())); }}Copy the code
Results 3 > (5) B, 14 > (A, 2) 3 > (B, 12) 14 > (A, 5) 3 > (B, 21) 14 > (A, 9) 3 > (C, 1)Copy the code

1.3.5 AggregatingState

import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.AggregatingState; import org.apache.flink.api.common.state.AggregatingStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; Public class AggregatingStateDemo {public static void main(String[] args) throws public class AggregatingStateDemo {public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Tuple2<String, Integer>>source =
                env.fromElements(Tuple2.of("A", 2), Tuple2.of("A", 3), Tuple2.of("A", 4),
                        Tuple2.of("B", 5), Tuple2.of("B", 7), Tuple2.of("B", 9), Tuple2.of("C", 1));


        source.keyBy(0)
                .flatMap(new AggregatingStateSum())
                .print().setParallelism(1);

        env.execute("AggregatingStateDemo"); } } class AggregatingStateSum extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, String>> { private AggregatingState<Integer, String> aggregatingState; Override public void open(Configuration parameters) throws Exception {// Register status /* constructor AggregatingStateDescriptor( String name, AggregateFunction<IN, ACC, OUT> aggFunction, Class<ACC> stateType)*/ AggregatingStateDescriptor<Integer, String, String> descriptor = new AggregatingStateDescriptor<Integer, String, String>("AggregatingState", new AggregateFunction<Integer, String, String>() {// Initialize @override public StringcreateAccumulator() {
                        return "";
                    }

                    @Override
                    public String add(Integer value, String accumulator) {
                        return accumulator + value + ","; @override public String getResult(String Accumulator) {returnaccumulator; Override public String merge(String a, String b) {Override public String merge(String a, String b) {returna + b; } }, String.class); aggregatingState = getRuntimeContext().getAggregatingState(descriptor); } @Override public void flatMap(Tuple2<String, Integer> element, Collector<Tuple2<String, String>> out) throws Exception { aggregatingState.add(element.f1); String str = aggregatingState.get(); // Remove the last STR ="list : " + str.substring(0, str.length() - 1) + " end"; out.collect(Tuple2.of(element.f0, str)); }}Copy the code

1.3.6 FoldingState (1.4 Deprecated)

Note that FlinkingState and FoldingStateDescriptor were deprecated in Flink 1.4 and will be completely removed in the future. Please switch to AggregatingState and AggregatingStateDescriptor.

1.4 Operator State Demo

1.4.1 ListState

This is a direct copy of the demo on the official website and then modified it

import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import java.util.ArrayList; import java.util.List; Public class OperatorStateDemo {public static void main(String[] args) throws Exception {public class OperatorStateDemo (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Tuple2<String, Integer>> dataStreamSource = env.fromElements(Tuple2.of("A", 2), Tuple2.of("A", 3),
                        Tuple2.of("B", 5), Tuple2.of("B", 7), Tuple2.of("C", 1));

        dataStreamSource
                .addSink(new BufferingSink(2)).setParallelism(1);

        env.execute("OperatorStateDemo"); } } class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, CheckpointedFunction {// private final int threshold; private transient ListState<Tuple2<String, Integer>> checkpointedState; Private List<Tuple2<String, Integer>> bufferedElements; public BufferingSink(int threshold) { this.threshold = threshold; this.bufferedElements = new ArrayList<>(); @override public void invoke(Tuple2<String, Integer> value, Throws Exception {// Add (value); Context contex throws Exception {// Add (value); // Check whether the value is greater than the thresholdif(bufferedElements. Size () == threshold) {system.out.println (bufferedElements. Stream ().reduce((a, b) -> Tuple2.of(value.f0,a.f1 + b.f1)).get()); // Clear the cache bufferedElements. Clear (); @override public void snapshotState(FunctionSnapshotContext context) throws Exception { checkpointedState.clear();for(Tuple2<String, Integer> element : bufferedElements) { checkpointedState.add(element); }} / / initialization or restore the state @ Override public void initializeState (FunctionInitializationContext context) throws the Exception { ListStateDescriptor<Tuple2<String, Integer>> descriptor = new ListStateDescriptor<>("buffered-elements",
                        TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
                        }));

        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        if (context.isRestored()) {
            for(Tuple2<String, Integer> element : checkpointedState.get()) { bufferedElements.add(element); }}}}Copy the code
(A,5) (B,12)Copy the code

1.5 the State backend

State Backend refers to where State data is stored. Currently, Flink supports the following three methods

  • MemoryStateBackend
  • FsStateBackend
  • RocksDBStateBackend

1.5.1 MemoryStateBackend

By default,state data is stored in the TaskManager heap. When checkpoint is required,state data is stored in the jobManager heap.

Since it is based on memory, which means fast, the amount of data saved should not be too large, and data may be lost. Generally used when developing tests.

1.5.2 FsStateBackend

The state data is stored in the TaskManager heap. When a task checkpoint is performed, the data is saved to the HDFS.

Since the data is saved to HDFS, the data is not lost, but the state size is limited by the TaskManager memory (5M by default).

1.5.3 RocksDBStateBackend

Have a look at rocksDB Chinese website.

The status information is stored in the RocksDB database (key-value data storage service). The status information is saved to a specified file (such as HDFS) when it is checked in a local file.

You can store a lot of data, and you can keep it from getting lost, which is what companies do.

1.5.4 How to Use State Backend

Connecting to Hadoop locally and using rocksDB requires additional dependencies

<! --rocksDB dependency --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.11</artifactId>  <version>${flink.version}</version> </dependency> <! Hadoop dependency --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>${hadoop.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>${hadoop.version}</version>
</dependency>
Copy the code

(You can also set the address to the local disk path, you can see the effect)

env.setStateBackend(new MemoryStateBackend());
env.setStateBackend(new FsStateBackend("hdfs://node2:8020/flink/checkpoints"));
env.setStateBackend(new RocksDBStateBackend("hdfs://node2:8020/flink/checkpoints".true));
Copy the code

You can also choose the same configuration and modify flink-conf.yaml

This is the implementation class for stateBacked

2.checkpoint

This parameter is triggered when the internal application fails and restarts.

2.1 overview:

  • To ensure the fault tolerance of the state, Flink needs to checkpoint the state.
  • Checkpoint is the core function of Flink to implement the fault tolerance mechanism. It can periodically generate snapshots based on the states of each Operator/task in the Stream according to the configuration, and store these state data regularly and persistently. When Flink program crashes accidentally, You can selectively recover from these snapshots when you re-run the program to correct program data anomalies caused by failures.
  • Flink’s checkpoint mechanism can interact with persistent storage on the premise that a persistent source needs to be able to replay events for a certain amount of time. Typical examples of such sources are persistent message queues (e.g. Apache Kafka, RabbitMQ, etc.) or file systems (e.g. HDFS, S3, GFS, etc.) used for persistent storage of state, e.g. Distributed file systems (e.g. HDFS, S3, GFS, etc.).

In simple terms, a snapshot is generated based on state based policies and stored in a distributed file system. Then, when fault tolerant recovery is performed, the checkpoint file is used to restore the current time and subsequent processing is performed.

2.2 Checkpoint Configuration

By default, the checkpoint function is turned off and must be enabled. The default checkPointMode is Exactly once and at-least-once. Exactly-once is best for most applications. At-least-once may be used in some applications with ultra-low latency.

Official website direct copy

Env. enableCheckpointing(1000); / / advanced options: / / set the pattern for exactly - once (the default) env. GetCheckpointConfig () setCheckpointingMode (CheckpointingMode. EXACTLY_ONCE); // Make sure there is at least 500 ms between checkpoints env.getCheckpointconfig ().setminPauseBetweenCheckpoints(500); // Checkpoints must be completed within one minute. If they are not completed, they are discarded. env.getCheckpointConfig().setCheckpointTimeout(60000); / / at the same time only allow a checkpoint env. GetCheckpointConfig () setMaxConcurrentCheckpoints (1); // Indicates that once Flink is canceled, Checkpoint data is retained. There are multiple checkpoints and you can restore Flink to a specified Checkpoint as required. env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELL ATION); // When there is a new savepoint, it allows the job to revert back to the checkpoint (this configuration was added by Flink in 1.9). env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
Copy the code

3. Data restoration

3.1 Restart Policy Overview

Flink supports different restart policies to control how jobs are restarted in the event of a failure. A cluster starts with a default restart policy, which is used when no specific restart policy is defined.

If a restart policy is specified when the work is submitted, it overwrites the cluster’s default policy, which can be specified from the Flink -conf.yaml configuration file. The restart-strategy configuration parameter defines which policy is used.

If Checkpointing is not enabled, use the no restart policy. If checkpointing is enabled but the restart policy is not configured, use the fixed-delay policy. The default value is integer.max_value. The restart policy can be configured in flink-conf.yaml, indicating the global configuration. It can also be specified dynamically in the application code, overwriting the global configuration.

3.2 Common Restart Policies

3.2.1 Fixed delay

1. Code configuration env. SetRestartStrategy (RestartStrategies fixedDelayRestart (3, / / try to restart the number of Time, of (10, Timeunit.seconds) // Interval time)); Yaml restart-strategy: fixed-delay restart-strategy.fixed-delay. Attempts: 3 restart-strategy.fixed-delay.delay: 10 sCopy the code

3.2.2 Failure Rate

1. Code configuration env. SetRestartStrategy (RestartStrategies failureRateRestart (3, / / the biggest number of failure Time. One Time of (5, TimeUnit. MINUTES), // Specify a Time period time.of (10, timeunit.seconds) // Interval between each restart)); Yaml restart-strategy: failure-rate restart-strategy.failure-rate. Max - failure-per-interval: 3 restart-strategy.failure-rate.failure-rate-interval: 5 min restart-strategy.failure-rate.delay: 10 sCopy the code

3.2.3 No Restart (No restart)

1. Application code set the env. SetRestartStrategy (RestartStrategies. NoRestart ()); 2. Configure flink-conf.yaml restart-strategy: noneCopy the code

3.3 Multi-checkpoint Configuration

If checkPOin is set, only one checkpoint is reserved by default. When the program fails, it can be recovered from the nearest checkpoint. However, if multiple checkpoints are reserved and the recovery is performed from a single checkpoint, it is more flexible. For example, if there was a problem with the data processing recently, we would like to go back an hour or earlier, and then we can check the data according to the previous checkpoint.

Parameter Description Value flink-conf.yaml (this parameter is strongly correlated with checkpoint creation time.) state.cache. num-retained: 10Copy the code

3.3 Recovering Data from a checkpoint

bin/flink run -s hdfs://node2:8020/flink/checkpoints/cc6a5758cf31aad25833d0503e6bd042/chk-132/_metadata flink-job.jar

Copy the code

// It can also be viewed on the web UI. However, if the task fails on YARN, this screen is not displayed.

Check out the official Flink Checkpoint Check Guide

3.4 the savepoint

3.4.1 track savapoint role

Flink uses the Savepoint function to perform calculations from the point before the program upgrade to ensure that data does not interrupt global and consistent snapshots. You can save data source offsets, operator status and other information, and can continue to consume from any time in the past when the application did savepoint.

Manually executed by users, it is a pointer to Checkpoint and does not expire. It is used during an upgrade.

Here is a detail. In order to upgrade smoothly between different versions of the job and between different versions of Flink, we manually assign ids to the operators using the UID (String) method. These ids will be used to determine the state range of each operator. If you do not manually specify an ID for each operator, Flink automatically generates an ID for each operator. As long as these ids remain unchanged, the program can be restored from savepoint. These automatically generated ids depend on the structure of the program and are sensitive to code changes. Therefore, you are advised to manually set the ID.

3.4.2 savapoint use

1: Set the Savepoint location in flink-conf.yaml. It is not necessary to set the Savepoint location. However, you do not need to specify the Savepoint location when creating savepoints for a specific Job. HDFS: / / 2:8020 / flink/savepoints 2: Bin /flink savePoint jobId [targetDirectory] [-yid yarnAppId] [-yid yarnAppId] bin/flink cancel-s[targetDirectory] jobId [-yid yarnAppId] [-yid needs to be specified for on YARN mode] 3: Starts job bin/flink run from the specified savepoint-s savepointPath [runArgs]
Copy the code

4. The last question is about the port I use

Because I connect my HDFS locally, I do not know myCluster, so I directly connect to the active NameNode of my current cluster, so I use port 8020, when operating in the server environment, you can directly use MyCluster. That is, you can use 9000 ports.

Here is my cluster configuration 9000 port: the default fileSystem port number: <property> <name>fs.defaultFS</name> <value> HDFS ://mycluster</value> </property> 8020 is the port number of namenode in active state <property> <name>dfs.namenode.rpc-address.mycluster.nn1</name> <value>node1:8020</value> </property>Copy the code

That’s the end of it, the last couple of demos are a little bit tricky, so I’m not going to do it myself. But they are all correct. In the next article, I’m going to demonstrate the use of Flink Event Time and Watermark.