preface

It’s been a while since the last Flink article, when we kind of went through some operators, so before we get into this article, let’s warm up and remember the code

Now we want to implement a function, which is also a word count, but this word count is going to implement a custom threshold and print every time a threshold is reached. If you already know Flink, you will know that we just need to customize a downstream

public class TestOperatorState {

    public static void main(String[] args) throws Exception{

        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

        DataStreamSource<Tuple2<String,Integer>> dataStreamSource = env.fromElements(

                Tuple2.of("spark".3),

                Tuple2.of("kafka".3),

                Tuple2.of("flink".3),

                Tuple2.of("hive".3),

                Tuple2.of("hbase".3),

                Tuple2.of("es".3)

        );

        dataStreamSource.addSink(new MySink(2));

        env.execute("TestOperatorState");

    }

}

Copy the code

This function is then implemented through a MySink

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.functions.sink.SinkFunction;



import java.util.ArrayList;

import java.util.List;



public class MySink implements SinkFunction<Tuple2<String.Integer>> {



    private List<Tuple2<String,Integer>> bufferElements;



    // Define a threshold

    private int threshold;

    public MySink(int threshold){

        this.threshold = threshold;

        bufferElements = new ArrayList<Tuple2<String, Integer>>();

    }



    public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {

        bufferElements.add(value);

        if (bufferElements.size() == threshold){

            System.out.println("Data:"+bufferElements);

            bufferElements.clear();

        }

    }

}

Copy the code

Runtime always quote this Failed to load the class “org. Slf4j. Impl. StaticLoggerBinder”, if you don’t think very pleasing to the eye, it can also add the pom

<dependency>

        <groupId>org.slf4j</groupId>

        <artifactId>slf4j-simple</artifactId>

        <version>1.7.25</version>

</dependency>

Copy the code

However, this will turn the output of 3 sentences into a screen full of some puzzling log, so consider yourself

However, there is an obvious problem with this program, because the data is stored in memory and will be lost when the program restarts. Therefore, in order to ensure the fault tolerance of state, Flink needs to checkpoint state.

A, the Content,

1.1 Let’s go back to the program

If we want to checkPoint the program we just wrote, we should give it one in MySink

private ListState<Tuple2<String.Integer>> checkPointState;

Copy the code

Then implement a CheckpointedFunction interface, this interface has two methods to implement, one is snapshotState, one is initializeState, they two English literal translation has been very easy to understand. One is to take a snapshot of the state, the other is to initialize the state, as shown in the following figure

For those of you who have seen the various states of Flink, you know how to do the following: register the State first, and then use it

Ok, at this point we are also following the example, initializeState is the method of data recovery when we restart. Note that the data types used here need to be kept in mind. The checkPoint is actually to maintain a special state to record its status, which is the same as the state seen before.

public void initializeState(FunctionInitializationContext context) throws Exception {

    ListStateDescriptor<Tuple2<String, Integer>> descriptor = new ListStateDescriptor<Tuple2<String, Integer>>(

            "buffer", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {

            @Override

            public TypeInformation<Tuple2<String, Integer>> getTypeInfo() {

                return super.getTypeInfo();

            }

        })

    );

    checkPointState = context.getOperatorStateStore().getListState(descriptor);

    // If the task restarts

    if (context.isRestored()){

        for (Tuple2<String, Integer> lostData : checkPointState.get()) {

            bufferElements.add(lostData);

        }

    }

}

Copy the code

The snapshotState is used to write data in our memory to bufferElements every once in a while

public void snapshotState(FunctionSnapshotContext context) throws Exception {

    checkPointState.clear();

    for (Tuple2<String, Integer> data : bufferElements) {

        bufferElements.add(data);

    }

}

Copy the code

This example is the original one on the official website, with exactly the same code, but in fact, we usually do not write the code in this way, because in this case, I have to maintain a state to record the information of this state, and we should entrust Flink to manage it for us according to the routine.

So where is the state stored in Flink?

1.2 state Storage location

Flink-supported StateBackend, state, is stored in the following three places by default:

1.2.1 MemoryStateBackend

State is stored in memory. This is also a master-slave architecture. Flink will start a JobManager service and then slave it to TaskManager. The status information is stored in the TaskManager heap and stored in the JobManager heap at checkpoint.

And we can specify it manually in our program, like this

So if our program hangs, the data in memory will be lost naturally, so to deal with this problem, we will modify this parameter

1.2.2 FsStateBackend

FsStateBackend is an optimization of the previous MemoryStateBackend, whose TaskManager periodically stores state to HDFS. To checkpoint, save the status to a specified file (such as HDFS).

env.setStateBackend(new FsStateBackend("hdfs path"));

Copy the code

Disadvantages: the state size is limited by the TaskManager memory (5M by default, which can be configured). If data in memory exceeds this value before HDFS storage, data will still be lost. The advantage is the memory operation, state access is fast

1.2.3 RocksDBStateBackend

env.setStateBackend(new RocksDBStateBackend("path"));

Copy the code

This configuration is used in the production environment. The status information is stored in the RocksDB database (key-value data storage service). When the status information is saved in the local file checkpoint, the status information is saved to the specified file (such as HDFS).

The disadvantage is that the state access speed is lower than that of FsStateBackend. Advantages: Can store an enormous amount of state information, because this is also distributed

1.3 introduction of checkpoint

To ensure the fault tolerance of the state, Flink needs to checkpoint the state.

Checkpoint is the core function of Flink to implement the fault tolerance mechanism. It can periodically generate snapshots based on the states of each Operator/task in the Stream according to the configuration, and store these state data regularly and persistently. When Flink program crashes accidentally, You can selectively recover from these snapshots when you re-run the program to correct program data anomalies caused by failures

The prerequisite for Flink’s checkpoint mechanism to interact with persistent stores (stream and state) is a persistent source that supports playback of events for a certain amount of time. Typical examples of such sources are persistent message queues (e.g. Apache Kafka, RabbitMQ, etc.) or file systems (e.g. HDFS, S3, GFS, etc.) used for persistent storage of state, e.g. Distributed file systems (e.g. HDFS, S3, GFS, etc.)

There are many tasks in a Flink Task, all of which generate many states, and these states are periodically stored somewhere, as shown in the figure as checkPointState.

Just take it out when it’s healed

1.4 Setting Parameters

Because checkPoint is not enabled by default, you need to enable it by setting it

env.enableCheckpointing(10000);

Copy the code

Set it to checkPoint every 10 seconds. However, the recommended value is between 20 and 120 seconds.

CheckpointConfig checkpointConfig = env.getCheckpointConfig();

checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

Copy the code

The checkPoint mode is set to the once semantics, which is the default

At Least One performance is definitely better, but data can be repeated, so reference scenarios. This is not the same as Spark Streaming, which can be EXACTLY_ONCE thanks to RDD fault tolerance

checkpointConfig.setMinPauseBetweenCheckpoints(500);

Copy the code

This is the minimum interval between two checkpoints. Some people may wonder if I have already given a 10 second interval between checkpoints. What is the meaning of this parameter? Here to explain, we perform checkPoint is certainly need a certain amount of time, such as the execution of checkPoint I spent 10 seconds or ten seconds I was not performed, setMinPauseBetweenCheckpoints (500) that is to say, I would have at least a certain amount of time between the two checkpoints, waiting a little bit for the last checkPoint

checkpointConfig.setCheckpointTimeout(60000);

Copy the code

The checkPoint timeout period is set. If the current checkPoint does not end in one minute, the next checkPoint is aborted and executed

checkpointConfig.setMaxConcurrentCheckpoints(1);

Copy the code

The default value is 1. That is, only the latest checkPoint is reserved

env.getCheckpointConfig(a).enableExternalizedCheckpoints(

    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

Copy the code

If the Flink processor is canceled, the Checkpoint data is kept for the Flink processor to be restored to the specified Checkpoint as required. You can also set the Flink processor to be automatically deleted when it stops

1.5 Using checkPoint to Restore Data

Restoring data is simply a restart task:

Flink supports different restart policies to control how jobs are restarted in the event of a failure. A cluster starts with a default restart policy, which is used when no specific restart policy is defined. If a restart policy is specified when the work is submitted, it overwrites the cluster’s default policy, which can be specified from the Flink -conf.yaml configuration file. The restart-strategy configuration parameter defines which policy is used.

Common restart policies

(1) Fixed delay strategy

(2) Failure rate

(3) No restart

Copy the code

If Checkpointing is not enabled, use the no restart policy.

If checkpointing is enabled but the restart policy is not configured, use the fixed-delay policy. The default value is integer.max_value. The restart policy can be configured in flink-conf.yaml, indicating the global configuration. It can also be specified dynamically in the application code, overwriting the global configuration.

1.5.1 Periodic Restart Policy

The first is global configurationflink-conf.yaml

restart-strategyfixed-delay

restart-strategy.fixed-delay.attempts: 3

restart-strategy.fixed-delay.delay: 10 s

Copy the code

We can also use code for configuration

Second: apply code Settings

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(

  3.// Number of restart attempts

  Time.of(10, TimeUnit.SECONDS) // The interval between each retry

));

Copy the code

1.5.2 Failure Rate Policy (Few Scenarios)

The first:

Global configurationflink-conf.yaml

restart-strategyfailure-rate

restart-strategy.failure-rate.max-failures-per-interval: 3

restart-strategy.failure-rate.failure-rate-interval: 5 min

restart-strategy.failure-rate.delay: 10 s

Copy the code

It can also be set in code

env.setRestartStrategy(RestartStrategies.failureRateRestart(

  3.// Maximum number of failures in a period

  Time.of(5, TimeUnit.MINUTES), // The time period for measuring the number of failures

  Time.of(10, TimeUnit.SECONDS) / / interval

));

Copy the code

This means that 3 restarts are allowed within 5 minutes

1.5.3 not resume

The job’s dead. It’s dead. Let it go

The first is global configurationflink-conf.yaml

restart-strategynone

Copy the code

Code sets

env.setRestartStrategy(RestartStrategies.noRestart());

Copy the code

Of course, if we are daily development, it is unlikely to set the global configuration, are in accordance with different needs to modify the Settings of this piece

1.6 Multiple checkpoint Settings

By default, if the Checkpoint option is set, Flink keeps only the last Checkpoint, and when Flink fails, Flink can recover from the latest Checkpoint. However, if we want to keep multiple checkpoints, and can select one of them to recover according to actual needs, this will be more flexible. For example, if we find that there is a problem with data recording in the last four hours, we want to restore the whole state to the previous four hours, Flink can keep multiple checkpoints.

Add the following configuration to the Flink configuration file conf/flink-conf.yaml to specify the maximum number of checkpoints to be saved

state.checkpoints.num-retained: 20

Copy the code

And of course we’re going to choose to configure it in code, which is what we just did

checkpointConfig.setMaxConcurrentCheckpoints(20)

Copy the code

After the task restarts, the latest checkPoint is used to restore data

This setting will view the corresponding Checkpoint in HDFS files stored on the directory HDFS DFS – ls HDFS: / / the namenode: 9000 / flink/checkpoints if you want to back to a Checkpoint, You only need to specify a corresponding Checkpoint path

1.7 Manually Restore Data using checkPoint

Each Flink task will have its own JobID, and the HDFS data saved by checkPoint will also be named according to this JobID. Note this. If we need to manually checkPoint data recovery, we need to go to the HDFS directory and find our checkPoint folder, which is named chk-xx by default, followed by a number indicating the current checkPoint number. The command is

bin/flink run -s hdfs://namenode:9000/flink/checkpoints/467e17d2cc343e6c56255d222bae3421/chk-56/_metadata flink-job.jar

Copy the code

Chk-xx: chk-xx: chk-xx: chk-xx: chk-xx: chk-xx: chk-xx: chk-xx: chk-xx: chk-xx: chk-xx: chk-xx

And of course there’s a problem with that, because Flink just did

checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

Copy the code

It is not end-to-end semantics. When the business changes and we need to stop the task and modify the code, the data may be repeated once again.

Therefore, we need to use savePoint at this time. Flink can continue to perform calculations from the point before the upgrade after the program is upgraded through savePoint function to ensure the uninterrupted data. You can save data source offsets, operator status and other information, and can continue to consume from any time in the past when the application did savepoint

1: set it in flink-conf.yamlSavepointStorage location

This parameter is not mandatory, but is later created for the specified JobSavepointYou do not need to specify this parameter when running the command manuallySavepointThe location of the

state.savepoints.dir: hdfs://namenode:9000/flink/savepoints



2: Trigger asavepointDirectly triggered or incancelWhen triggered 】

bin/flink savepointJobId [targetDirectory] [-yid yarnAppIdonIn YARN mode, you need to specify -yid.



// Stop Flink with this command and save another checkPoint before exiting

bin/flink cancel-s [targetDirectory] jobId [-yid yarnAppIdonIn YARN mode, you need to specify the -yid parameter, which is application_xxx.



3: From the specifiedsavepointStart the job

bin/flink run -s savepointPath [runArgs]

Copy the code

Manually executed by users, it is a pointer to Checkpoint and does not expire. It is used during an upgrade.

Note: In order to upgrade between different versions of the job and between different versions of Flink, it is highly recommended that the programmer manually assign an ID to the operator through the uid(String) method.


These ids will be used to determine the state range of each operator. If you do not manually specify an ID for each operator, Flink automatically generates an ID for each operator. As long as these ids remain unchanged, the program can be restored from savepoint. These automatically generated ids depend on the structure of the program and are sensitive to code changes. Therefore, it is strongly recommended that the user manually set the ID.

Finally

watermark