This is the fifth day of my participation in Gwen Challenge

Flink State State

Flink is a stateful streaming computing engine, so it saves the intermediate result (state) in the TaskManager heap by default. However, when a task dies, the corresponding state of the task is emptied, resulting in data loss. Therefore, it is impossible to guarantee the correctness of the result, even if the correct result is expected. All the data had to be recalculated, which was inefficient. To ensure at-leastonce and exact-once, data state needs to be persisted to a more secure storage medium. Flink provides storage media such as in-heap memory, out-of-heap memory, HDFS, and RocksDB

Let’s take a look at the states provided by Flink.

There are two types of states in Flink

  • Keyed State

Based on the State on the KeyedStream, which is bound to a specific Key, each Key on the KeyedStream corresponds to a State, and each Operator can initiate multiple Thread processing, but data with the same Key can only be processed by the same Thread. Therefore, a Keyedstate can exist in only one Thread, and a Thread has multiple Keyedstates

  • Non-keyed State (Operator State)

The Operator State is independent of the Key and is bound to the Operator. The entire Operator corresponds to only one State. For example, the Kafka Connector in Flink uses OperatorState, which stores all (partition, offffset) mappings of the consumption Topic of each Connector instance

  • Flink provides the following data structures for Keyed State
  1. ValueState: single-valued state of type T. This state is bound to the corresponding Key. The simplest state is that the value is updated by updatevalueGet status value
  2. ListState: The state value on the Key is a list, which can be added to the list by the add method or returned by the get() method可迭代** to iterate over the status values
  3. ReducingState: The user-passed one is called each time the add() method is called to add a valueReduceFunctionAnd finally merged into oneA singleThe state of the value
  4. MapState

    : the status value is a Map. The user adds elements using the PUT or putAll methods. Get (key) obtains value by the specified key, and searches for it using entries(), keys() and values()
    ,>
  5. AggregatingState <IN, OUT> : Keeps a single value representing the aggregation of all values added to the state. andReducingStateConversely, the aggregation type may be different from the type of the element added to the state. Elements added with add(IN) call the user-specifiedAggregateFunctionaggregated
  6. FoldingState<T, ACC>: Obsolete recommended useAggregatingStatePreserve a single value that represents an aggregation of all values added to the state. In contrast to ReducingState, the aggregation type may be different from the element type added to the state. The element added with add(T) calls the user-specified elementFoldFunctionFold into aggregate values
  • ValueState

    ValueState: single-valued state of type T. This state is bound to the corresponding Key. The simplest state is updated by update and obtained by value

    public class ValueStateDemo {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment environment =
                    StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> stream = environment
              .socketTextStream("192.168.150.110".8888);
            stream.flatMap(new RichFlatMapFunction<String, CarInfo>() {
    
                @Override
                public void flatMap(String s, Collector<CarInfo> collector) throws Exception {
                    try {
                        String[] sArr = s.split(",");
                        collector.collect(CarInfo.builder().carId(sArr[0]).speed(Long.parseLong(sArr[1])).build());
                    } catch (Exception ex) {
                        System.out.println(ex);
                    }
                }
            }).keyBy(CarInfo::getCarId).map(new RichMapFunction<CarInfo, CarInfo>() {
    
                private ValueState<Long> valueState;
    
                @Override
                public void open(Configuration parameters) {
                    ValueStateDescriptor<Long>  state = new ValueStateDescriptor<Long> ("state",
                            BasicTypeInfo.LONG_TYPE_INFO);
                    valueState = getRuntimeContext().getState(state);
                }
                @Override
                public CarInfo map(CarInfo carInfo) throws Exception {
                    if (valueState.value()==null||carInfo.getSpeed()>valueState.value()){
                        valueState.update(carInfo.getSpeed());
                    }
                    carInfo.setSpeed(valueState.value());
                    return carInfo;
                }
            }).print("> > > > > > > >");
    
            environment.execute("execute"); }}Copy the code
    # # # console192.16888.180.
    nc -lk  8888### Input parameters1.200
    2.101
    3.103
    1.102
    2.201
    3.303
    #### sout
    >>>>>>>>:2> CarInfo(carId=1, speed=200) > > > > > > > > :1> CarInfo(carId=2, speed=201) > > > > > > > > :2> CarInfo(carId=3, speed=303) > > > > > > > > :2> CarInfo(carId=1, speed=200) > > > > > > > > :1> CarInfo(carId=2, speed=201) > > > > > > > > :2> CarInfo(carId=3, speed=303## Display the maximum speed for each vehicleCopy the code
  • ReduingState

    Every time the add() method is called to add a value, the ReduceFunction passed in by the user will be called, and finally merged into a single state value, which is an aggregation function similar to the Reduce function

    
    public class ReducingStateDemo {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment environment =
                    StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> stream = environment
                    .socketTextStream("192.168.88.180".8888);
            stream.flatMap(new RichFlatMapFunction<String, CarInfo>() {
    
                @Override
                public void flatMap(String s, Collector<CarInfo> collector) throws Exception {
                    try {
                        String[] sArr = s.split(",");
                        collector.collect(CarInfo.builder().carId(sArr[0]).speed(Long.parseLong(sArr[1])).build());
                    } catch (Exception ex) {
                        System.out.println(ex);
                    }
                }
            }).keyBy(CarInfo::getCarId).map(new RichMapFunction<CarInfo, CarInfo>() {
    
                private ReducingState<Long> valueState;
    
                @Override
                public void open(Configuration parameters) {
                    ReducingStateDescriptor state = new ReducingStateDescriptor("state".new ReduceFunction<Long>() {
                        @Override
                        public Long reduce(Long t1, Long t2)  {
                            return t1+t2;
                        }
                    },
                    BasicTypeInfo.LONG_TYPE_INFO);
                    valueState = getRuntimeContext().getReducingState(state );
                }
                @Override
                public CarInfo map(CarInfo carInfo) throws Exception {
                    valueState.add(carInfo.getSpeed());
                    carInfo.setSpeed(valueState.get());
                    return carInfo;
                }
            }).print("> > > > > > > >");
    
            environment.execute("execute"); }}Copy the code
    # # # console192.16888.180.
    nc -lk  8888### Input parameters1.100
    2.101
    3.103
    1.101
    2.201
    3.303
    1.100
    2.101
    3.103
    1.101
    2.201
    3.303
    #### sout
    >>>>>>>>:2> CarInfo(carId=1, speed=100) > > > > > > > > :1> CarInfo(carId=2, speed=201) > > > > > > > > :2> CarInfo(carId=3, speed=103) > > > > > > > > :2> CarInfo(carId=1, speed=201) > > > > > > > > :1> CarInfo(carId=2, speed=302) > > > > > > > > :2> CarInfo(carId=3, speed=406) > > > > > > > > :1> CarInfo(carId=2, speed=503) > > > > > > > > :2> CarInfo(carId=3, speed=509) > > > > > > > > :2> CarInfo(carId=1, speed=301) > > > > > > > > :2> CarInfo(carId=3, speed=812) > > > > > > > > :2> CarInfo(carId=1, speed=402) > > > > > > > > :1> CarInfo(carId=2, speed=604## Aggregation functionality similar to reduce functionalityCopy the code
  • ListState

    The state values on the Key are a list of state values that can be added to the list by the add method or Iterable by the get() method that returns an **Iterable**

    public class ListStateDemo {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment environment =
                    StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> stream = environment
                    .socketTextStream("192.168.150.110".8888);
            stream.flatMap(new RichFlatMapFunction<String, CarInfo>() {
    
                @Override
                public void flatMap(String s, Collector<CarInfo> collector) throws Exception {
                    try {
                        String[] sArr = s.split(",");
                        collector.collect(CarInfo.builder().carId(sArr[0]).speed(Long.parseLong(sArr[1])).build());
                    } catch (Exception ex) {
                        System.out.println(ex);
                    }
                }
            }).keyBy(CarInfo::getCarId).map(new RichMapFunction<CarInfo, String>() {
    
                private ListState<Long> listState;
    
                @Override
                public void open(Configuration parameters) {
                    ListStateDescriptor<Long> state = new ListStateDescriptor<Long>("state",
                            BasicTypeInfo.LONG_TYPE_INFO);
                    listState = getRuntimeContext().getListState(state);
                }
                @Override
                public String map(CarInfo carInfo) throws Exception {
                    listState.add(carInfo.getSpeed());
                    return String.format("Car ID :%s, speed history %s",carInfo.getCarId(),listState.get().toString());
                }
            }).print("> > > > > > > >");
    
            environment.execute("execute"); }}Copy the code
    # # # console192.16888.180.
    nc -lk  8888### Input parameters1.100
    2.201
    3.301
    1.102
    2.201
    3.303
    1.101
    2.201
    3.301
    1.102
    2.201
    3.302
    #### sout
    >>>>>>>>:2< span style = "box-sizing: border-box;1, speed history [100] > > > > > > > > :2< span style = "box-sizing: border-box;3, speed history [301] > > > > > > > > :2< span style = "box-sizing: border-box;1, speed history [100.101] > > > > > > > > :1< span style = "box-sizing: border-box;2, speed history [201] > > > > > > > > :1< span style = "box-sizing: border-box;2, speed history [201.201] > > > > > > > > :2< span style = "box-sizing: border-box;3, speed history [301.303] > > > > > > > > :2< span style = "box-sizing: border-box;1, speed history [100.101.102] > > > > > > > > :1< span style = "box-sizing: border-box;2, speed history [201.201.201] > > > > > > > > :2< span style = "box-sizing: border-box;3, speed history [301.303.301] > > > > > > > > :1< span style = "box-sizing: border-box;2, speed history [201.201.201.201] > > > > > > > > :2< span style = "box-sizing: border-box;1, speed history [100.101.102.102] > > > > > > > > :2< span style = "box-sizing: border-box;3, speed history [301.303.301.302You can see that the speed of each section is recordedCopy the code
  • AggregatingState

    Preserve a single value that represents an aggregation of all values added to the state. In contrast to ReducingState, the aggregation type may be different from the type of elements added to the state. Elements added using add(IN) call the user-specified AggregateFunction for aggregation

    public class AggregatingStateDemo {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment environment =
                    StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> stream = environment
                    .socketTextStream("192.168.150.110".8888);
            stream.flatMap(new RichFlatMapFunction<String, CarInfo>() {
    
                @Override
                public void flatMap(String s, Collector<CarInfo> collector) throws Exception {
                    try {
                        String[] sArr = s.split(",");
                        collector.collect(CarInfo.builder().carId(sArr[0]).speed(Long.parseLong(sArr[1])).build());
                    } catch (Exception ex) {
                        System.out.println(ex);
                    }
                }
            }).keyBy(CarInfo::getCarId).map(new RichMapFunction<CarInfo, CarInfo>() {
    
                private AggregatingState<Long,Long> valueState;
    
                @Override
                public void open(Configuration parameters) {
                    AggregatingStateDescriptor<Long,Long,Long> state = new AggregatingStateDescriptor<Long,Long,Long>("state".new AggregateFunction<Long, Long, Long>() {
                        @Override
                        // Initializes the accumulator value
                        public Long createAccumulator(a) {
                            return 0L;
                        }
                        @Override
                        // Add values to the accumulator
                        public Long add(Long v, Long acc) {
                            return v+acc;
                        }
                        @Override
                        // Return the final result
                        public Long getResult(Long result) {
                            return result;
                        }
                        @Override
                        // Merge two accumulator values
                        public Long merge(Long acc1, Long acc2) {
                            return acc1+acc2;
                        }
                    },BasicTypeInfo.LONG_TYPE_INFO);
                    valueState = getRuntimeContext().getAggregatingState(state);
                }
                @Override
                public CarInfo map(CarInfo carInfo) throws Exception {
                    valueState.add(carInfo.getSpeed());
                    carInfo.setSpeed(valueState.get());
                    return carInfo;
                }
            }).print("> > > > > > > >");
    
            environment.execute("execute"); }}Copy the code
    # # # console192.16888.180.
    nc -lk  8888### Input parameters1.100
    2.201
    3.301
    1.102
    2.201
    3.303
    1.101
    2.201
    3.301
    1.102
    2.201
    3.302
    #### sout  
    >>>>>>>>:2> CarInfo(carId=1, speed=100) > > > > > > > > :2> CarInfo(carId=1, speed=202) > > > > > > > > :1> CarInfo(carId=2, speed=201) > > > > > > > > :2> CarInfo(carId=3, speed=302) > > > > > > > > :1> CarInfo(carId=2, speed=402) > > > > > > > > :2> CarInfo(carId=3, speed=603) > > > > > > > > :1> CarInfo(carId=2, speed=603) > > > > > > > > :1> CarInfo(carId=2, speed=804) > > > > > > > > :2> CarInfo(carId=3, speed=906) > > > > > > > > :2> CarInfo(carId=1, speed=304) > > > > > > > > :2> CarInfo(carId=3, speed=1207) > > > > > > > > :2> CarInfo(carId=1, speed=405)
    ##id 1 ->100+102+101+102=205  id 2 -> 201+201+201+201=804  id 3 301+303+301+302=1207
    
    Copy the code