State Backend Flink — 1.13 Optimization and Production Practices shared by Tang Yun at Flink Meetup on 22 May. The contents include:

  1. A bird ‘s-eye view of Flink 1.13 state-backend changes

  2. RocksDB state-backend optimized memory control

  3. Flink state-Backend module development plan

1.13 Changes in state-backend

1. Monitor the performance of State access

First, Flink 1.13 introduced performance monitoring for State access, namely latency Trackig State.

State access latency is obtained by differentiating System#nanoTime before and after each access. This function is not limited to the type of State Backend. Custom implementations of State Backend can reuse this function. Performance monitoring for State access can have a performance impact, so by default, samples are taken every 100 times. The above picture shows the monitoring results.

State Access performance monitoring has different impacts on State Backend:

  • For RocksDB State Backend, the performance loss is about 1%;

  • For Heap State Backend, the performance loss can be up to 10%.

The preceding figure shows three related configuration items. This function is disabled by default. You need to manually enable it by specifying the parameter state.backend.latency-track.keyed-state-enabled=true.

2. Unified Savepoint format

After Flink 1.13, Savepoint supports switching State Backend, greatly improving the application of the system. After Savepoint is created, the type of State Backend in the job topology can be changed, for example, from RocksDB to Heap or from Heap to RocksDB. The change is limited to Savepoint. The file format stored by Checkpoint is related to State Backend, but not in general format. Checkpoint does not support this function.

3. Clearer apis

Another important change was the clarity of concept. Flink 1.13 distinguishes between states and checkpoints.

In Flink, State Backend has two functions:

  • Provide status access and query;
  • If Checkpoint is enabled, data is periodically uploaded to the Durable storage and meta is returned to the Job Manager (JM for short).

In previous versions of Flink, these two features were mixed together, and the concepts of state storage and checkpoint creation were lumped together in a general way, leading to confusion and difficulty for beginners.

Currently, the types of State Backend are shown in the figure above. As a result of the confusion of concepts, RocksDB State Backend can be embedded in Memory State Backend or Heap State Backend. In fact, State Backend embedded in RocksDB describes the internal Checkpoint data transfer direction.

For Memory State Backend, no Filepath was specified under the original build. If HA is not enabled, all Checkpoint data is returned to the JM. When Filepath is specified in Memory State Backend, Checkpoint data is directly uploaded to the specified Filepath and is not returned to the JM.

For Fs State Backend, data is directly uploaded to a defined Filepath.

Of course, RocksDB State Backend is most commonly used online with a remote FS address. The old writing method can cause confusion in State and checkpoint understanding for Flink users.

Two concepts are broken down in Flink 1.13:

  1. The concept of State Backend is narrow and only describes State access and storage.

  2. Another concept is Checkpoint storage, which describes Checkpoint behavior, such as whether Checkpoint data is sent back to JM memory or uploaded to a remote location. Therefore, the corresponding configuration items are also separated.

You need to specify not only State Backend but also Checkpoint Storage. The following is the mapping between the old and new interfaces:

Of course, the old interface is still in place, but it is recommended that you use the new interface and migrate to the new way, which is conceptually clearer.

4. RocksDB partitioned Index & filter

Here are the optimizations for RocksDB:

Flink 1.13 added partitioned indexing to RocksDB. As shown in the figure above, the data stored in RocksDB Block Cache consists of three parts:

  1. Data Block (real Data)

  2. Index Block (Index of each piece of data)

  3. Filter Block (Bloom Filter for files)

The block size can be clearly seen from the square size, Index and Filter are significantly larger than Data. For a 256 MB SSD file, the Index Block is about 0.5 MB, the Filter Block is about 5 MB, and the Data Block ze is 4KB by default. When the Cache Block size is several hundred MB, the performance deteriorates if the number of files is very large and the Index and Filter are constantly changed, especially after memory control is enabled by default. The obvious phenomenon is that IO is very frequent, and the performance is always poor.

In Flink 1.13, partitioned Index & Filter of RocksDB is reused. Simply speaking, partitioned Index of RocksDB has been indexed at multiple levels. That is, the top layer of memory is permanent, and the bottom layer is loaded back as needed, which greatly reduces the data Swap competition. In the online test, the performance was about 10 times better than in the scenario with small memory. So, if Rocksdb does not perform as expected under memory control, this can also be a performance optimization point.

There are currently two parameters that control this function:

  1. State. The backend. Rocksdb. Memory. Partitioned index – filters: true (the default false)

  2. State. The backend. Rocksdb. Block. The metadata – blocksize (multilevel index memory configuration)

5. Default behavior changes

In Flink 1.13, the default behavior changes as shown in the figure above.

  • State.backend. async configuration items are no longer supported. All Checkpoint are asynchronous (synchronous Checkpoint scenarios are rare and have been removed).

  • State. The backend. Rocksdb. Checkpoint. Transfer. Thread. The num default value increases to 4 rocksdb incremental checkpoint, Upload files in four threads In four threads when RocksDB recovers data from an incremental Checkpoint.

If HDFS is unstable after the upgrade, check whether the upgrade is related to the HDFS.

Optimization of memory control in RocksDB state-backend

Flink 1.10 started with state-Backend memory optimization, which has been improved in each subsequent release.

The basic reason for memory control on RocksDB State Backend is that Flink State corresponds to the Column Family of RocksDB.

Prior to Flink 1.10, if you declared two states, each would enjoy its own Write Buffer and Cache memory. Flink did not limit the number of states in an operator. Theoretically, the user can set thousands or tens of thousands of states, which may cause the container memory to burst. In addition, in the slot-sharing mechanism of Flink, multiple operators containing keyed states can exist in a slot, and it is difficult to ensure that the number of states is not excessive.

Multiple RocksDB will have multiple Write Buffer Managers. As shown in the figure above, take a single Write Buffer Manager as an example. It adds its own memory reserve to the Block Cache and performs billing according to its own memory control logic. The Block Cache has THE LRU Handle and will be kicked out if it exceeds the budget.

The arena block mentioned above is the minimum memory allocation unit for Write Buffer. By default, it is 1/8 of the default Write Buffer configuration and has 8MB memory by default. However, in extreme cases, there are too many small files on the disk, resulting in poor performance. For example, if the total memory allocation is too small, the amount of memory controlled by the Write Buffer is small. The default value for the Write Buffer is 8MB when it first applies for memory. When 7/8 of the total memory is used, it is set to Immutable, and then the data is transferred to the disk.

If the memory usage of a single Arena block is too large, a critical arena block may be written only a few KB, but the memory behavior control of the Write Buffer is triggered, and the Arena block is set to Immutable, then the data is flushed out. Produces small files, resulting in very poor performance. For LSM DB, early flush has a significant impact on read magnification performance, and the Write Buffer cannot cache more read requests.

We introduce strong verification of arena block size. If the arena block size is inappropriate, a Warning log is printed to indicate that the arena block size needs to be adjusted accordingly. The arena block size needs to be reduced to address the problem of data being flushed prematurely and thus improve performance.

The RocksDB Block Cache splits the Arena Block into shards to improve concurrency. In essence, the Write Buffer Manager splits the Arena block into dummy entries when making a reserve, which in effect only accounts for the entries and occupies the logical capacity of the block cache. In the current version of RocksDB used by Flink, shards defaults to 1MB, so there is a risk that shards data will exceed the budget. Later versions of RocksDB addressed this risk by switching from 1MB to 256KB. Since there is no upgrade to the RocksDB version in Flink 1.13, this problem still exists. In addition, Flink 1.13 does not set the RocksDB Block Cache memory control to Strict Mode.

The current community version of RocksDB is 5.17.2, which differs from the latest community version of RocksDB 6.17+ by about 1000 commit. When the community tried to upgrade the version of RocksDB, it found that the older version had some performance backlogs, and even if it tried to fix them, it only solved some of them, with less than 10% performance degradation in some access interfaces. As a result, Flink 1.13 has decided not to update the RocksDB version for the time being, and the community is expected to update Flink 1.14 accordingly, introducing some new Futures for RocksDB that will compensate for the currently known 10% performance rollback Gap.

To sum up, RocksDB’s memory control is not perfect, and Writer Buffer does not strictly control Data blocks, so there is a small probability of memory overuse in theory. But at present, the whole is relatively stable, the part of the overuse will not be too much. If you want to manually allocate more memory to RocksDB to prevent overuse and prevent K8S kill in OOM in a cloud native environment, you can manually increase JVM OverHead memory, as shown in the figure above.

The reason for not increasing the Task off-heap is that the Task off-heap is currently mixed with Direct Memeory, and even if the whole Heap is increased, it is not necessarily allocated to RocksDB as a Buffer. So we recommend tuning JVM OverHead to address memory overruns. Similarly, if you run into similar problems with other related libraries in Flink, you can also try to make JVM OverHead larger to solve the problem. If you want to find out the cause of memory leakage, you can also use the corresponding analysis tools such as Jemalloc + Jeprof to solve the problem.

3. Development planning of Flink state-Backend module

The development plan of the state-Backend module in Flink1.14 and 1.15 is as follows:

It should be noted that only RocksDB currently supports incremental Checkpoint.

For Changelog, this concept exists in Apache Kafka and Apache Pulsar. The introduction of Changelog is Flink’s reference to traditional message-oriented middleware as a streaming computing system. When data is uploaded, you can create a proxy to periodically write data to external logs. In this way, data is not uploaded each time to Checkpoint, making the Checkpoint time more controllable.

Flink 1.13 has implemented the proxy layer, but the actual logic layer has not been implemented. Specific implementation will be done in Flink 1.14, including relevant log cleaning logic. It is expected that Flink 1.14 can improve the status and Checkpoint performance, especially the current two-phase commit relies on Checkpoint commit and the introduction of Changelog State Backend. It is expected that Flink 1.14 can solve the related pain points as soon as possible.