I have been in contact with Flink for some time, and I have encountered some problems, among which a checkpoint failure caused the job to restart. I have encountered many problems, and generally the job can be restored after the restart. I did not pay much attention to them.

Our Flink test environment has three nodes. The deployment architecture is that each Flink node deploys a DataNode of HDFS, and HDFS is used for Flink checkpoint and Savepoint

The phenomenon of

The log shows that there are three datanodes alive, the file copy is 1, but the file fails to be written

There are 3 datanode(s) running and no node(s) are excluded
Copy the code

I searched the Internet for this error and there is no direct answer. I looked at the Namenode log and there is no more direct information

50070 Everything is ok on the Web UI. The remaining datanode space is still large and the usage is less than 10%

I have tried to put a file on HDFS and then get it down. All the files are ok, indicating that the HDFS service is ok and datanode also works

Log Symptom 1

Go ahead and scroll through the namenode log and notice that there are some warning messages,



There is a problem with the block placement strategy

Enable the corresponding debugging as prompted

etc/hadoop/log4j.properties
Copy the code

find

log4j.logger.org.apache.hadoop.fs.s3a.S3AFileSystem=WARN
Copy the code

Copy the format and add it below

log4j.logger.org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy=DEBUG
log4j.logger.org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor=DEBUG
log4j.logger.org.apache.hadoop.net.NetworkTopology=DEBUG
Copy the code

Restart the Namenode and rerun the Flink job

Log Symptom 2

The problem we see at this point is that the rack-aware policy cannot be satisfied because we do not provide rack-mapping scripts and default to the same rack, but this should not have anything to do with it

In many production environments, HDFS is not configured with rack mapping scripts, and checkpoint failures do not always occur. At least, put/get files are normal.

At this time to start thinking about see HDFS source, according to the above log the call stack, see BlockPlacementPolicyDefault and related DatanodeDescriptor first

The source code basically means that when selecting a Datanode for a block, some checks should be made on the datanode, such as the free space, to see how busy it is

When our problems recur, a look at the log reveals some key information about this

The scheduled size is over 43G, so we think the normal Datanode, Namenode, thinks it’s running out of space

why

What does the scheduled size mean? According to the code, the scheduled size is the block size multiplied by a counter that represents the number of new file blocks. HDFS evaluates the storage space that may be required based on these two parameters. It is equivalent to reservating space for each Datanode. After calculating the actual space occupied, it will adjust back.

After understanding this principle, you can determine that too much space is reserved for datanodes in a period of time.

Flink checkpoint mechanism can refer to this article www.jianshu.com/p/9c587bd49… Many task threads on taskManager write HDFS

Check the HDFS directory structure. There are a large number of checkpoint files named like UUID, and each file is small

When a large number of concurrent jobs are performed, more checkpoint files are created on HDFS. Although our file size is only a few KB, the reserved space on each Datanode is 128 MB multiplied by the number of files allocated (the files are small and no more than 128 MB), so 43 3G of space. How many files can I order at most? Divided by more than 300, three nodes is at most 900, we have multiple jobs, the total concurrency is large, before the reserved space is fully released, it is easy to occur this problem.

HDFS is not suitable for storing small files, because a large number of small files will cause inode consumption and block location metadata growth, which will strain namenode memory. This example also shows that when blocksize is set to a large size, the file size is much smaller than blocksize. A large number of these small files will cause the Datanode to be “unavailable” directly.

The solution

The block size is not a cluster property, but a file property, which can be set by the client. Flink At this time, each TaskManager and JobManager are HDFS “clients”. According to Flink, You can perform the following operations: 1. Specify a HDFS configuration file path in conf/flink-conf.yaml

fs.hdfs.hadoopconf: /home/xxxx/flink/conf
Copy the code

Select the same directory as flink’s configuration file path

2. Put two configuration files, one is core-site. XML and the other is HDFS -site. XML

Core-site. XML can be used if checkpoint and savepoint specify HDFS addresses.

Add blockSize to HDFS -site. XML, for example, set it to 1M

You need to adjust the block size flexibly based on your job status.

Restart the Flink cluster and submit the job. During the operation, observe the Fsimage size of HDFS and ensure that metadata is not too large due to too small blocks or too many small files.

summary

We have synchronized this issue to the cluster automation deployment script, which will add the blocksize configuration specifically for deployment.

Flink’s CHECKPOINT solution, which relies on HDFS, is a bit bloated in lightweight flow computing scenarios. Whether filesystem or rocksDB is used for checkpoint distributed storage, HDFS is required. In fact, considering checkpoint principle and data type, Es should also be a good choice, unfortunately the community does not provide this solution.

Welcome to pay attention to my official account: [Java rotten pig skin], get exclusive finishing learning resources, daily dry goods and welfare gifts.

Article source: club.perfma.com/article/173…