background

During project development, the following scenarios were encountered: data was grouped according to business logic, and all subsequent calculations were in groups; When the first data is received, the end time of the day corresponding to the current data is the time when the timer is triggered. The interval between the next timing and the last timing is 86400 seconds.

Data structure and backup data

/ * * *@author DSH12138
* @since2020-11-02 * Original data structure * */

@Data
@NoArgsConstructor
@ToString
public class SourceDataScheme {

    private String plant;
    private Float values;
    private String device;
    private Long timestamp;

    public SourceDataScheme(String plant, Float values, String device, Long timestamp) {
        this.plant = plant;
        this.values = values;
        this.device = device;
        this.timestamp = timestamp; }}Copy the code

Test data:

SourceDataScheme(plant=MS003, values=1, device=DXL480, timestamp=1603788065000)
SourceDataScheme(plant=MS001, values=2, device=DGS301, timestamp=1603788065000)
SourceDataScheme(plant=MS003, values=3, device=DXL479, timestamp=1603788065000)
SourceDataScheme(plant=MS003, values=4, device=DXL478, timestamp=1603788065000)
SourceDataScheme(plant=MS003, values=5, device=DXL477, timestamp=1603788065000)
SourceDataScheme(plant=MS003, values=6, device=DXL476, timestamp=1603788065000)
SourceDataScheme(plant=MS003, values=7, device=DXL475, timestamp=1603788065000)
SourceDataScheme(plant=MS003, values=8, device=DXL474, timestamp=1603788065000)
SourceDataScheme(plant=MS003, values=9, device=DXL473, timestamp=1603788065000)
SourceDataScheme(plant=MS003, values=10, device=DXL472, timestamp=1603788065000)
SourceDataScheme(plant=MS003, values=11, device=DXL471, timestamp=1603788065000)
================================================================================
SourceDataScheme(plant=MS003, values=12, device=DXL480, timestamp=1603788066000)
SourceDataScheme(plant=MS001, values=13, device=DGS301, timestamp=1603788066000)
SourceDataScheme(plant=MS003, values=14, device=DXL479, timestamp=1603788066000)
SourceDataScheme(plant=MS003, values=15, device=DXL478, timestamp=1603788066000)
SourceDataScheme(plant=MS003, values=16, device=DXL477, timestamp=1603788066000)
SourceDataScheme(plant=MS003, values=17, device=DXL476, timestamp=1603788066000)
SourceDataScheme(plant=MS003, values=18, device=DXL475, timestamp=1603788066000)
SourceDataScheme(plant=MS003, values=19 device=DXL474, timestamp=1603788066000)
SourceDataScheme(plant=MS003, values=20, device=DXL473, timestamp=1603788066000)
SourceDataScheme(plant=MS003, values=21, device=DXL472, timestamp=1603788066000)
SourceDataScheme(plant=MS003, values=22, device=DXL471, timestamp=1603788066000)
Copy the code

Wrong way – use Boolean directly

Identify the batch of data as the first batch of data at the time of the first data entry. Add a variable to KeyedProcessFunction with an initial value of true and set it to false after the timer is done.

So in the following code, the variable Boolean firstValue = true is set in KeyedProcessFunction; In processElement in KeyedProcessFunction, the processing is:

if (firstValue) {  context.timerService().registerEventTimeTimer(sourceDataScheme.getTimestamp() + 10);
                System.out.println("Did the data" + sourceDataScheme + "Timing");
                firstValue = false;
                }
Copy the code

See the complete code below:

import com.leayun.merak.po.source.SourceDataScheme;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Collector;
import javax.annotation.Nullable;

/ * * *@author DSH12138
* @sinceTest keyedProcessFunction ValueState<Boolean> and Boolean difference */
public class KeyedHandle {

    public DataStream<SourceDataScheme> keyedProcess(DataStream<SourceDataScheme> sourceDataSchemeDataStream) {

        SingleOutputStreamOperator<SourceDataScheme> resultDataStream = sourceDataSchemeDataStream
        // Set up real-time watermark
        .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<SourceDataScheme>() {
            @Nullable
            @Override
            public Watermark checkAndGetNextWatermark(SourceDataScheme lastElement, long extractedTimestamp) {
                long newTimestamp = lastElement.getTimestamp();
                if (extractedTimestamp > newTimestamp) {
                newTimestamp = extractedTimestamp;
                }
                return new Watermark(newTimestamp);
            }

             @Override
            public long extractTimestamp(SourceDataScheme element, long recordTimestamp) {
                long newTimestamp = element.getTimestamp();
                if (recordTimestamp > newTimestamp) {
                newTimestamp = recordTimestamp;
                }
                return newTimestamp;
            }
        })
        .keyBy(new KeySelector<SourceDataScheme, Tuple2<String, String>>() {
        @Override
        public Tuple2<String, String> getKey(SourceDataScheme sourceDataScheme) throws Exception {
        	return new Tuple2<>(sourceDataScheme.getPlant(), sourceDataScheme.getDevice());
        }})
        .process(new KeyedProcessFunction<Tuple2<String, String>, SourceDataScheme, SourceDataScheme>() {
            Boolean firstValue = true;
             @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<SourceDataScheme> out) throws Exception {
            	System.out.println("Timed trigger, current key =" + ctx.getCurrentKey().f0 + "," + ctx.getCurrentKey().f1);
  				context.timerService().registerEventTimeTimer(timestamp+86400*1000L);
            }
            @Override
            public void processElement(SourceDataScheme sourceDataScheme, Context context, Collector<SourceDataScheme> collector) throws Exception {
                if (firstValue) {
                    						context.timerService().registerEventTimeTimer(sourceDataScheme.getTimestamp() + 10);
                System.out.println("Did the data" + sourceDataScheme + "Timing");
                firstValue = false;
                }
            }
        }).setParallelism(2);

        returnresultDataStream; }}Copy the code

But when you test with the test data, the console output is as follows:

Timestamp =1603788065000Plant =MS003, values=1, device=DXL480, timestamp=1603788065000(plant=MS001, values=2, device=DGS301, timestamp=1603788065000The timing of)// Output after the rest of the data is enteredTimed trigger, current key = MS001, DGS301 timed trigger, current key = MS003, DXL480Copy the code

It can be seen that the other data groups are not timed, because the parallelism is 2, so the first two data are divided into two tasks. Boolean firstValue = true; Is a variable in a task. The rest of the data groups are assigned to this task. Since the value of firstValue in the task has been set to false, the timing cannot be performed.

Right – Use StateValue

Use StateValue to replace variables

import com.leayun.merak.po.source.SourceDataScheme;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Collector;

import javax.annotation.Nullable;

/ * * *@author DSH12138
* @sinceTest keyedProcessFunction ValueState<Boolean> and Boolean difference */
public class KeyedHandle {

    public DataStream<SourceDataScheme> keyedProcess(DataStream<SourceDataScheme> sourceDataSchemeDataStream) {

        SingleOutputStreamOperator<SourceDataScheme> resultDataStream = sourceDataSchemeDataStream
        // Set up real-time watermark
        .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<SourceDataScheme>() {
            @Nullable
            @Override
            public Watermark checkAndGetNextWatermark(SourceDataScheme lastElement, long extractedTimestamp) { 
                long newTimestamp = lastElement.getTimestamp();
                if (extractedTimestamp > newTimestamp) {
                    newTimestamp = extractedTimestamp;
                }
                return new Watermark(newTimestamp);
            }

            @Override
            public long extractTimestamp(SourceDataScheme element, long recordTimestamp) {
                long newTimestamp = element.getTimestamp();
                if (recordTimestamp > newTimestamp) {
                    newTimestamp = recordTimestamp;
                }
                return newTimestamp;
            }
        })
        .keyBy(new KeySelector<SourceDataScheme, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> getKey(SourceDataScheme sourceDataScheme) throws Exception {
                return new Tuple2<>(sourceDataScheme.getPlant(), sourceDataScheme.getDevice());
            }
         })
         .process(new KeyedProcessFunction<Tuple2<String, String>, SourceDataScheme, SourceDataScheme>() {
            ValueState<Boolean> firstState;
            @Override
            public void open(Configuration parameters) throws Exception {
                firstState = getRuntimeContext().getState(new ValueStateDescriptor<>("first-value", Boolean.class));
            }

            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<SourceDataScheme> out) throws Exception {
                System.out.println("Timed trigger, current key =" + ctx.getCurrentKey().f0 + "," + ctx.getCurrentKey().f1);
                ctx.timerService().registerEventTimeTimer(timestamp+86400*1000L);
            }

            @Override
            public void processElement(SourceDataScheme sourceDataScheme, Context context, Collector<SourceDataScheme> collector) throws Exception {
                if( firstState.value() == null ) {
                    context.timerService().registerEventTimeTimer( sourceDataScheme.getTimestamp() + 10 );
                    firstState.update(false);
                }
            }
        }).setParallelism(2);

        returnresultDataStream; }}Copy the code

conclusion

The variable set in KeyedProcessFunction is related to its parallelism, and a variable value is maintained in each task, because there will be multiple groups of data into a task calculation, so it will cause the variables in the previous group to modify, resulting in the subsequent group can not perform the corresponding calculation;

ValueState is used in KeyedProcessFunction, and its value corresponds to each group one by one. Each group will maintain a state, and the state can also be temporarily stored at Flink checkpoint.