Please follow my personal blog to learn more

transform

Function: Convert the Soure data to the required data

Commonly used functions

map

The map operator is similar to python’s map operator, which translates data into data in a lambda expression. The Map operator in Flink is more general, specifying the transformation process with a new map() method. The format for converting one data type (input) to another data type (output) is as follows

dataStream.map(new Mapfunction<input,output>(){
	@Override
	map(input){xxx};
})
Copy the code

You can see it better by drawing it

The rectangle becomes an ellipse, but the color doesn’t change (the logic doesn’t change).

flatMap

FlatMap Flattening operator: Flatmap outputs multiple output types with an example string “hello,word” in the form of Tuple2(” hello “,1) and Tuple2(” word “,1)

inputDataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String input, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = input.split(",");
                for (String word : words) {
                    collector.collect(new Tuple2<>(word, 1)); }}});Copy the code

filter

The filter operator filters the input data of the input type. True is left and false is filtered out as shown below

Keyby (grouping)

DataStream → KeyedStream: Logically splits a stream into disjointed partitions, each containing elements with the same key, implemented internally as a hash.

Use dataStream. Keyby (param)

Param: Data field subscripts start from 0 by default. You can also enter fields with IDS

Rolling Aggregation

  • The sum, sum
  • Max: Select the maximum value of each stream
  • Min: select the minimum value of each stream
  • Minby: Selects the minimum value for a field data in keyedStream
  • Maxby: Selects the maximum value for a field in keyedStream

Reduce (complex aggregation)

KeyedStream → DataStream: Merges the current element with the result of the last aggregation, producing a new value, and returns a stream containing the result of each aggregation, rather than just the final result of the last aggregation. Example: Compare the temperature of the last timestamp with the data from the sensor ID, and select the timestamp of the maximum temperature

import com.chengyuyang.apitest.SensorReading;
import com.chengyuyang.apitest.SourceFromCustom;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Transform_keyed_Reduce {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // The data source is the custom generated sensor data
        DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
        // Select the timestamp of the maximum temperature by comparing the temperature of the last timestamp with the data from the sensor ID
        SingleOutputStreamOperator<SensorReading> resultDataStream = inputDataStream.keyBy("id")
                .reduce(new CustomReduceFunction());
        resultDataStream.print();
        env.execute();
    }

    public static class CustomReduceFunction implements ReduceFunction<SensorReading> {
        @Override
        public SensorReading reduce(SensorReading sensorReading, SensorReading input) throws Exception {
            String id = sensorReading.getId();
            Long timestamp = input.getTimestamp();
            // Select the maximum temperature according to the timestamp
            double temperature = Math.max(sensorReading.getTemperature(), input.getTemperature());
            return newSensorReading(id, timestamp, temperature); }}}Copy the code

Split and select

split

DataStream → SplitStream: Split a DataStream into two or more DataStream based on certain characteristics

select

SplitStream→DataStream: Obtain one or more DataStream from a SplitStream

Here is the following

case

According to the temperature of the sensor, with 60 degrees as the standard, greater than or equal to 60 degrees for high flow, other as low flow

import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.chengyuyang.apitest.SensorReading;
import com.chengyuyang.apitest.SourceFromCustom;

public class Transform_Split_Select {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<SensorReading> inputDatStream = env.addSource(new SourceFromCustom.CustomSource());
        // Split by temperature 60 standard split by temperature 60 standard
        SplitStream<SensorReading> splitStream = inputDatStream.split(new OutputSelector<SensorReading>() {
            @Override
            public Iterable<String> select(SensorReading sensorReading) {
                Double temperature = sensorReading.getTemperature();
                if (temperature >= 60) {
                    return Lists.newArrayList("high");
                } else {
                    return Lists.newArrayList("low"); }}});//SplitStream→DataStream; //SplitStream→DataStream
        DataStream<SensorReading> high = splitStream.select("high");
        DataStream<SensorReading> low = splitStream.select("low");
        DataStream<SensorReading> all = splitStream.select("high"."low");

        high.print("high").setParallelism(1);
        low.print("low").setParallelism(1);
        all.print("all").setParallelism(1); env.execute(); }}Copy the code

The results are as follows

Connect and comap will

Connect (One Country, Two Systems)

DataStream,DataStream → ConnectedStreams: Connect two streams as ConnectedStreams, but keep their various data types unchanged. The two streams are independent of each other, and the input data types can be the same or different

Here is the following

comap coflatmap

ConnectedStreams → DataStream function is the same as map and flatMap, but because the two connect streams have different data types, they are processed by map and flatMap, and the final result can be different

case

Output normal temperature data for high temperature plus warning label according to split and SELECT cases

Part of the code

ConnectedStreams<Tuple2<String, Double>, SensorReading> connectDataStream = highDataStream.connect(lowDataStream);
        SingleOutputStreamOperator<Object> resultDataStream = connectDataStream.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
            @Override
            public Object map1(Tuple2<String, Double> input) throws Exception {
                // Processing high temperature data
                return new Tuple3<>(input.f0, input.f1, "warnning");
            }

            @Override
            public Object map2(SensorReading input) throws Exception {
                // Process normal temperature data
                return newTuple3<>(input.getId(), input.getTimestamp(), input.getTemperature()); }});Copy the code

union

DataStream, DataStreamTwo or more DataStream are union operated to produce a new DataStream containing all DataStream elements.Note: If you union a DataStream with itself, you will see each element appear twice in the new DataStreamHere is the following

Connect is different from Union

  • The two flows before Union must be of the same type; Connect can be different
  • Connect can be adjusted to be the same or different in later coMap
  • Connect can operate only two streams, and Union can operate more than one

case

Union is performed according to split and select cases, and normal temperature data is output for high temperature with warning label

Part of the code

DataStream<SensorReading> unionDataStream = high.union(low);
        SingleOutputStreamOperator<Tuple3<String, Long, Object>> resultDataStream = unionDataStream.map(new MapFunction<SensorReading, Tuple3<String, Long, Object>>() {
            @Override
            public Tuple3<String, Long, Object> map(SensorReading input) throws Exception {
                if (input.getTemperature() >= 60) {
                    return new Tuple3<String, Long, Object>(input.getId(), input.getTimestamp(), "warnning");
                } else {
                    return newTuple3<String, Long, Object>(input.getId(), input.getTimestamp(), input.getTemperature()); }}});Copy the code

This article is reproduced on my personal blog. The Transform operator of Flink stream processing API is copyrighting CC 4.0 BY SA

Welcome to exchange and study

Personal blog

CSDN home page