Hello everyone, my name is Big Saint.

I didn’t update the Flink status article in time last week, the reason is that I was a little lazy, I had a day off last weekend and wanted to lie down, then I spent a day with my phone. Later, please monitor your friends and update the article on time. Without further ado, let’s move on to today’s topic.

To recap, last time we talked about the basic concepts of Flink state, classification, and the use of KeyState. That is, the knowledge of the four aspects of Flink State, Flink State category, Flink KeyState and Operator State in the Flink State outline above. If you are not familiar with the above four aspects of knowledge, you can read the article of Flink State for the first time. So let’s move on to the rest of the Flink State outline.

Flink State TTL

Flink framework manages Managed State. Flink framework manages Managed State. Flink framework manages Managed State. Flink framework manages Managed State. Managed State includes Keyed State and Operator State. Flink framework manages the creation and clearing of Operator State. However, Keyed State is applied to KeyedStream and is a State created by our own users. For example, if I create a MapState to save users who have logged in for five consecutive days, then my State will generally save users who have logged in for five consecutive days. However, if you hand it to Flink framework to clear the State, It’s possible that the Flink framework clears your status data on day 3, which obviously won’t work. Therefore, the user needs to specify when the State created by ourselves will expire and be cleared. However, I just started to doubt that the Keyed State created by ourselves will never expire and will always be saved. The answer is no, because the State stores more and more data, which is definitely not desirable, so we have the State TTL.

What is Flink State TTL

The full name of State TTL is State time-to-live. The TTL of Flink State is the TTL of Keyed State. The TTL of Flink State is the TTL of Keyed State. When you define the expiration time of these states, the data in the state will be cleared when the expiration time conditions are met. So when will this expiration date condition be met? The timestamp of the current time > the timestamp of the last access state + the timestamp of the expiration time set by TTL. Don’t worry if you don’t understand this, and I’ll make it clear with the actual code.

How to use Flink State TTL

How to use Flink State TTL without further ado

private transient ValueState valueStateTTL; StateTtlConfig StateTtlConfig = statettlconfig. newBuilder(time.seconds (1)) // Set the update type of the state SetUpdateType (StateTtlConfig. UpdateType. OnCreateAndWrite) / / expired has not yet been cleared to the state of the data does not return to the user SetStateVisibility (StateTtlConfig. StateVisibility. NeverReturnExpired) / / overdue clean up strategy of objects Incremental cleaning. CleanupIncrementally (1, true) .build();

ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor<>(“valueStateTTL”, String.class);

    valueStateDescriptor.enableTimeToLive(stateTtlConfig);
    valueStateTTL = getRuntimeContext().getState(valueStateDescriptor);


}
Copy the code

First of all, ValueState is defined using trilogy:

First, define valueStateTTL; attribute

Second, create a Value Descriptor object

Third, set the state descriptor to the context object, So the valueStateTTL property that we create can be used, ValueStateDescriptor ValueStateDescriptor =new ValueStateDescriptor<>(“valueStateTTL”, String.class);

Then let’s focus on the definition of state TTL:

StateTtlConfig StateTtlConfig = statettlconfig. newBuilder(time.seconds (1)) // Set the update type of the state SetUpdateType (StateTtlConfig. UpdateType. OnCreateAndWrite) / / expired has not yet been cleared to the state of the data does not return to the user SetStateVisibility (StateTtlConfig. StateVisibility. NeverReturnExpired) / / overdue clean up strategy of objects Incremental cleaning. CleanupIncrementally (1, true) .build();

NewBuilder (time.seconds (1)) : indicates the TTL of this state, that is, the expiration Time of this state;

SetUpdateType () : updates the timestamp of the status in three main ways

setUpdateType

 Disabled
Copy the code

Indicates that TTL is disabled and the status never expires.

OnCreateAndWrite

The timestamp of the state is updated only when the state data is updated, that is, when the update() method is called.

OnReadAndWrite

The update status timestamp is updated when the status data is read and updated, that is, when the value() and update() methods are called.

SetStateVisibility () : Indicates the visibility of expired state data in two ways

setStateVisibility

ReturnExpiredIfNotCleanedUp

If the state is out of date, but the state’s data has not been cleaned up (state cleanup is only done when it is read), it is still returned to the user

NeverReturnExpired

As long as the state is out of date, it is never returned to the user

There are three CleanupStrategies:

cleanupFullSnapshot()

When a checkpoint occurs, a full snapshot is taken to check the expired status. This snapshot only ensures that no expired status exists at the checkpoint. However, the TaskManager heap memory does not process the expired status, so OOM may occur.

cleanupIncrementally(int cleanupSize,boolean xxx )

This is the incremental cleanup expiration state. By default, the cleanup of expired data occurs every time the state is accessed. Setting the second parameter to true allows the cleanup to occur when the state is written to or erased. The first parameter is the number of pieces of data to check each time a cleanup is triggered. Note that this cleanup only works for the heap on the state back end, that is, Memory and FlieSystem, not RocksDB.

cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries)

This policy is only valid for the RocksDB state back end.

Let’s practice a state expiration case.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);

    String topic = "test";
    String groupId = "user-login";

    FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId);
    DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);

    kafkaDS
            .flatMap(new FlatMapFunction<String, ValueBean>() {

                @Override
                public void flatMap(String value, Collector<ValueBean> out) throws Exception {
                    String[] s = value.split(" ");
                    ValueBean valueBean = new ValueBean();
                    valueBean.setUserId(s[0]);
                    valueBean.setTime(s[1]);
                    out.collect(valueBean);
                }
            })
            .keyBy(value -> value.getUserId())
            .process(new ValueProcessFunction());


    env.execute();

    
    
Copy the code

public class ValueProcessFunction extends KeyedProcessFunction<String, ValueBean, ValueBean> { private transient ValueState valueStateTTL; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); StateTtlConfig StateTtlConfig = statettlconfig. newBuilder(time.minutes (1)) // Sets the update type of the state SetUpdateType (StateTtlConfig. UpdateType. OnCreateAndWrite) / / expired has not yet been cleared to the state of the data does not return to the user SetStateVisibility (StateTtlConfig. StateVisibility. NeverReturnExpired) / / overdue clean up strategy of objects Incremental cleaning. CleanupIncrementally (1, true) .build(); ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor<>(“testValueStateTTL”, String.class); valueStateTTL = getRuntimeContext().getState(valueStateDescriptor); valueStateDescriptor.enableTimeToLive(stateTtlConfig); } @Override public void processElement(ValueBean value, KeyedProcessFunction<String, ValueBean, ValueBean>.Context ctx, Collector out) throws Exception { String s = valueStateTTL.value(); if (s == null) { s = value.getUserId(); } valueStateTTL.update(s); System.out.println(valueStateTTL.value()); }

} the process() function defines a ValueState value, and the TTL of the state is set.

ValueState is created based on the value of keyBy. ValueState is created based on the value of keyBy.

Second, when does the state expire? It is the current timestamp > the timestamp to access the state data + the TTL to access the state data.

If we access the data at 2021-12-04 15:20:27 and we set the TTL to 1 minute, then if the current time comes to 2021-12-04 15:21:28, our data will expire. Although the state data is out of date, it is uncertain whether we can access it or not, depending on how the above parameters are configured.

If you setStateVisibility specifies NeverReturnExpired, your state back end is heap based and you set cleanupIncrementally so that when you access the state again you get null data.

Some insights into using State TTL

Set a reasonable TTL for your business

For example, I in production is a very low-level mistakes, there is a demand statistics for five consecutive days user logged in, before I think of five days after the day before the certain state data is don’t need it, then I will set the state of TTL 1 day, then go to production run for five days, found no one continuous user logged in, The problem was then discovered by reviewing the business and code.

The selected state backend must match the cleanup policy you configured

If your state back end is heap based, you choose incremental cleanup. If you the state of the backend is RocketsDB, you will choose cleanupInRocksdbCompactFilter clean way.

Configure the visibility of your data

The setStateVisibility parameter is used to determine whether your expired data will be returned to users based on your specific business.

MapState Indicates the TTL of the status

The TTL of the state of the MapState is expired for the key in the MapState. If you set the TTL to 10s, the TTL is for the key in the MapState<key,value>, not the key you keyby. Different values of keyby have different map states, but whether the MapState is expired depends on the key in your MapState<key,value>.

MapState is for MapState inside this key expired, this I tested in the local, if interested, you can add me, I will share the source code to you.

Here I would like to add that I have read almost all articles on the Internet that say Flink state TTL, and they give the condition of state expiration: then if the timestamp + TTL of the last access exceeds the current time, it indicates that the state is expired. Then I decided that was not true, and wrote code and tested it many times before I came to a conclusion.

The current timestamp > the timestamp that accesses the state data + the TTL timestamp state of the state will expire.

Flink State TTL says a lot. If you don’t remember the above content, let me summarize the key points for you. You can also remember a few key points:

Flink State TTL for our own written Keyed State, such as ValueState, MapState expired, is to avoid memory OOM and let us more flexible use of State.

To use Flink State TTL, we create a trilogy of Flink states and then set the corresponding properties.

Note that your chosen state backend and cleanup mode match.

Flink StateBackend

As mentioned above, Managed State is Managed by Flink framework, but in order for users to use Flink State more flexibly, Flink provides three out-of-the-box State back-end to save State. The state is a local variable in the Flink subtask instance. Where is this local variable stored? It’s actually stored on the state backend, and that’s what the state backend is for. It’s kind of abstract, but you can think of the state as our goods, and the back end of the state is our warehouse, and the warehouse is where we keep the goods. Flink provides three different state backends to store state.

MemoryStateBackend

MemoryStateBackend Is used by default if a Flink program is not configured with a state backend. MemoryStateBackend is stored in MemoryStateBackend. MemoryStateBackend is stored in TaskManager heap memory, so when we use MemoryStateBackend state, our state is stored in TaskManager heap memory. When Checkpoint is complete, the status of the status backend is stored in JobManager memory.

advantages

Speed is fast.

disadvantages

The data stored in each state is limited to a maximum size of 5MB

The data is stored in memory, and the data is lost when the program is down

Usage scenarios

Local debugging

FsStateBackend

FsStateBackend also stores state in TaskManager heap memory and stores the state to the file system path you specify when a checkpoint occurs. We can set it like this in our code:

env.setStateBackend(new FsStateBackend(“file:///xxx”));

Yaml can also be configured globally in flink-conf.yaml:

state.backend:filesystem state.checkpoint.dir:hdfs://hadoop101:9000/checkpoints

Among them:

Filesystem indicates that FsStateBackend is used

Jobmanager indicates the use of MemoryStateBackend

Rocksdb: RocksDBStateBackend is used

FsStateBackend applies to:

Tasks with large states, long Windows, and large key/value states

RocksDBStateBackend

RocksDB is a KV database based on THE LSM implementation, which stores part of the state in disk and part of the state in memory, and then updates the state to the specified location when ckeckpoint is complete. Note here that if you want to use RocksDBStateBackend, you first need to add dependency packages.

Applicable scenario

Tasks with large states, long Windows, and large key/value states

Summarize the keyedState data store and Operator State data store

KeyedState Data store:

StateBackend Storage location Checkpoint Storage Location Application Scenario Memory TM memory JM memory Debugging (do not use this function in production environments) Filesystem TM Memory HDFS status is small. High performance Rocksdb TM HDFS large status scenario in the local Rocksdb

Operator State Data store:

StateBackend Storage location Checkpoint Storage Location Application Scenario Memory TM In the MEMORY Debug in the JM memory (forbidden in the production environment) Filesystem TM in the HDFS production environment Rocksdb TM HDFS production environment in memory

Finally, the keyedState data storage and Operator State data storage comes from the public number: Big data Dregs Rui, is a very powerful senior big data development engineer from Dachang, his public number has a lot of big data dry goods, interested in the public number can pay attention to the big data dregs Rui.

Well, today’s content is here, today talked about the TTL of Flink state, Flink state backend these two pieces of content. If there is something unclear in the article, or if you want the sample code of the article, you can scan the picture below, follow the wechat official number and click on the right to contact me, add my wechat and communicate with me.

This article is published by OpenWrite!