Copyright Notice: This set of technical column is the author (Qin Kaixin) usually work summary and sublimation, through extracting cases from real business environment to summarize and share, and give business application tuning suggestions and cluster environment capacity planning and other content, please continue to pay attention to this set of blog. Copyright notice: No reprint, welcome to learn. QQ email address: [email protected], if you have any questions, you can contact at any time.

1 Flink application starts

The. / bin/flink run - m yarn - cluster - yn 1024-2 - yjm ytm 1024 - c streaming. SoetWindowWordCountJavaCheckPoint (entrance) / usr/local/install/testJar/FlinkExample - 1.0 - the SNAPSHOT - jar - with - dependencies. Jar jar (path), port 9010Copy the code

2 Checkpoint Saves and restores data

2.1 Checkpoin Setting and saving

  • Copyright Notice: This set of technical column is the author (Qin Kaixin) usually work summary and sublimation, through extracting cases from real business environment to summarize and share, and give business application tuning suggestions and cluster environment capacity planning and other content, please continue to pay attention to this set of blog. Copyright notice: No reprint, welcome to learn. QQ email address: [email protected], if you have any questions, you can contact at any time.

  • By default, if the Checkpoint option is set, Flink keeps only the last Checkpoint, and when Flink fails, Flink can recover from the latest Checkpoint. However, if we want to keep multiple checkpoints, and can select one of them to recover based on actual needs, this will be more flexible. For example, if we find that the data processing of the last four hours has a problem, we want to restore the entire status to the previous four hours

  • Flink supports multiple checkpoints. Add the following configuration to the conf/flink-conf.yaml configuration file of Flink to specify the maximum number of checkpoints to be saved.

      state.checkpoints.num-retained: 20
    Copy the code
  • This setting will view the corresponding Checkpoint in HDFS files stored on the directory HDFS DFS – ls HDFS: / / the namenode: 9000 / flink/checkpoints if you want to back to a Checkpoint, You only need to specify a corresponding Checkpoint path

2.2 Checkpoint recovery

  • If the Flink program fails abnormally, or if there has been a data processing error in the recent period, we can restore the program from a Checkpoint

  • -s indicates the path to checkpoint recovery.

    bin/flink run -s hdfs://namenode:9000/flink/checkpoints/467e17d2cc343e6c56255d222bae3421/chk-56/_metadata flink-job.jar

3 the SavePoint analysis

3.1 Global Consistency Snapshot

  • Flink uses the Savepoint function to continue to perform calculations from the point before the upgrade after the program is upgraded to ensure data continuity
  • Global, consistency snapshot. You can save data source offset, operator operation status and other information
  • You can continue to consume from any time in the past when the application has made a savepoint

Theory of 3.2:

  • Application timing trigger, used to save status, will expire
  • Used when the internal application fails and restarts

3.3 the savePoint theory

  • Manually executed by users, it is a pointer to Checkpoint and does not expire. It is used during an upgrade
  • Note: In order to upgrade smoothly between different versions of the job and between different versions of Flink, it is highly recommended to manually assign ids to the operator using the UID (String) method. These ids will be used to determine the state range of each operator. If you do not manually specify an ID for each operator, Flink automatically generates an ID for each operator.
  • As long as these ids remain unchanged, the program can be restored from savepoint. These automatically generated ids depend on the structure of the program and are sensitive to code changes. Therefore, it is strongly recommended that the user manually set the ID.

3.4 Using savePoint

  • Copyright Notice: This set of technical column is the author (Qin Kaixin) usually work summary and sublimation, through extracting cases from real business environment to summarize and share, and give business application tuning suggestions and cluster environment capacity planning and other content, please continue to pay attention to this set of blog. Copyright notice: No reprint, welcome to learn. QQ email address: [email protected], if you have any questions, you can contact at any time.

  • 1: Configure the Savepoint storage position in flink-conf.yaml

    This parameter is not mandatory. However, you do not need to specify the Savepoint position when creating the Savepoint of the specified Job.

      state.savepoints.dir: hdfs://namenode:9000/flink/savepoints
    Copy the code
  • 2: Trigger a savepoint [triggered directly or at Cancel]

    Bin /flink savepoint jobId [targetDirectory] [-yid yarnAppId] [You need to specify -yid for ON YARN mode] bin/flink cancel -s [targetDirectory] jobId [-yid yarnAppId]Copy the code

  • 3: Starts the job from the specified Savepoint

      bin/flink run -s savepointPath [runArgs]
    Copy the code

4 conclusion

Flink is a new big data processing engine, the data is not enough, hard to write, each cherish, thank you!

Copyright Notice: This set of technical column is the author (Qin Kaixin) usually work summary and sublimation, through extracting cases from real business environment to summarize and share, and give business application tuning suggestions and cluster environment capacity planning and other content, please continue to pay attention to this set of blog. Copyright notice: No reprint, welcome to learn. QQ email address: [email protected], if you have any questions, you can contact at any time.

Qin Kaixin in Shenzhen 201811252101