You can make a quick change in everything

The Flink operation is used to convert one or more datastreams to new datastreams on demand. It is mainly divided into the following three categories:

  • DataStream performs Transformations related to data streams.
  • Physical Partitioning: Physical partitioning. Flink provides an underlying API that allows users to define data partitioning rules;
  • Task chaining and resource groups: Task chaining and resource groups. Allows users fine-grained control of task chains and resource groups.

The main apis are introduced as follows:

Second, DataStream Transformations

2.1 Map [DataStream → DataStream]

Perform specific conversion operations on each element in a DataStream:

DataStream<Integer> integerDataStream = env.fromElements(1.2.3.4.5);
integerDataStream.map((MapFunction<Integer, Object>) value -> value * 2).print();
/ / output 2,4,6,8,10
Copy the code

2.2 FlatMap [DataStream → DataStream]

FlatMap is similar to Map, but an input element in a FlatMap can be mapped to one or more output elements, as shown in the following example:

String string01 = "one one one two two";
String string02 = "third third third four";
DataStream<String> stringDataStream = env.fromElements(string01, string02);
stringDataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out) throws Exception {
        for (String s : value.split("")) {
            out.collect(s);
        }
    }
}).print();
// Output each individual word, to save typesetting, remove the newline here, the following text is the same
one one one two two third third third four
Copy the code

2.3 Filter [DataStream → DataStream]

Used to filter eligible data:

env.fromElements(1.2.3.4.5).filter(x -> x > 3).print();
Copy the code

2.4 KeyBy and Reduce

  • KeyBy [DataStream → KeyedStream] : used to divide data with the same Key value into the same partition.
  • Reduce [KeyedStream → DataStream] : used to perform data reduction calculation.

In the following example, partition the data by key value and scroll to calculate the sum:

DataStream<Tuple2<String, Integer>> tuple2DataStream = env.fromElements(new Tuple2<>("a".1),
                                                                        new Tuple2<>("a".2), 
                                                                        new Tuple2<>("b".3), 
                                                                        new Tuple2<>("b".5));
KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = tuple2DataStream.keyBy(0);
keyedStream.reduce((ReduceFunction<Tuple2<String, Integer>>) (value1, value2) ->
                   new Tuple2<>(value1.f0, value1.f1 + value2.f1)).print();

// Continue summing, output:
(a,1)
(a,3)
(b,3)
(b,8)
Copy the code

The KeyBy operation has the following two limitations:

  • When the KeyBy operation is used for a user-defined POJOs type, the custom type must override the hashCode method;
  • The KeyBy operation cannot be used on array types.

2.5 Aggregations [KeyedStream → DataStream]

Aggregations are official Aggregations operators that encapsulate common Aggregations. The sum operations performed by Reduce can also be rewritten by the sum operator in Aggregations as follows:

tuple2DataStream.keyBy(0).sum(1).print();
Copy the code

In addition to sum, Flink also provides common aggregation operators such as min, Max, minBy and maxBy:

// Scroll to the minimum value of the specified key, which can be specified by index or fieldName
keyedStream.min(0);
keyedStream.min("key");
// Scroll to calculate the maximum value of the specified key
keyedStream.max(0);
keyedStream.max("key");
// Scroll to the minimum value of the specified key and return its corresponding element
keyedStream.minBy(0);
keyedStream.minBy("key");
// Scroll to calculate the maximum value of the specified key and return its corresponding element
keyedStream.maxBy(0);
keyedStream.maxBy("key");

Copy the code

2.6 Union [DataStream* → DataStream]

Used to connect two or more datastreams with the same element type. Of course, a DataStream can also be connected to its native life, in which case each element in the DataStream is fetched twice:

DataStreamSource<Tuple2<String, Integer>> streamSource01 = env.fromElements(new Tuple2<>("a", 1), new Tuple2<>("a", 2));  DataStreamSource<Tuple2<String, Integer>> streamSource02 = env.fromElements(new Tuple2<>("b", 1), new Tuple2<>("b", 2));  streamSource01.union(streamSource02); streamSource01.union(streamSource01,streamSource02);Copy the code

2.7 Connect [DataStream,DataStream → ConnectedStreams]

The Connect operation is used to Connect two or more DataStreams of different types. The return type is ConnectedStreams, and the connected DataStreams can share the data state with each other. Note, however, that since data types vary from DataStream to DataStream, you need to convert ConnectedStreams back to DataStream using CoMap or CoFlatMap to perform subsequent computations:

DataStreamSource<Tuple2<String, Integer>> streamSource01 = env.fromElements(new Tuple2<>("a".3), 
                                                                            new Tuple2<>("b".5));
DataStreamSource<Integer> streamSource02 = env.fromElements(2.3.9);
// Use connect to connect
ConnectedStreams<Tuple2<String, Integer>, Integer> connect = streamSource01.connect(streamSource02);
connect.map(new CoMapFunction<Tuple2<String, Integer>, Integer, Integer>() {
    @Override
    public Integer map1(Tuple2<String, Integer> value) throws Exception {
        return value.f1;
    }

    @Override
    public Integer map2(Integer value) throws Exception {
        return value;
    }
}).map(x -> x * 100).print();

/ / output:
300 500 200 900 300
Copy the code

2.8 the Split and Select

  • Split [DataStream → SplitStream] : It is used to Split a DataStream into multiple DataStream according to the specified rules. Note that this is a logical Split. That is, Split labels data with different types, but ultimately returns only a SplitStream.
  • Select [SplitStream → DataStream] : To obtain different types of DataStream from the logically split SplitStream, use the Select operator as shown in the following example:
DataStreamSource<Integer> streamSource = env.fromElements(1.2.3.4.5.6.7.8);
/ / tag
SplitStream<Integer> split = streamSource.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>();
        output.add(value % 2= =0 ? "even" : "odd");
        returnoutput; }});// Get even data sets
split.select("even").print();
/ / output 2,4,6,8
Copy the code

2.9 project [DataStream → DataStream]

Project is used primarily to get the specified set of fields in a tuples as shown in the following example:

DataStreamSource<Tuple3<String, Integer, String>> streamSource = env.fromElements(
                                                                         new Tuple3<>("li".22."2018-09-23"),
                                                                         new Tuple3<>("ming".33."2020-09-23"));
streamSource.project(0.2).print();

/ / output
(li,2018- 09 -23)
(ming,2020- 09 -23)
Copy the code

Three, physical partition

Physical Partitioning is the underlying API provided by Flink, which allows users to use built-in partitioning rules or custom partitioning rules to partition data so as to avoid excessive skewing of data in some partitions. The commonly used partitioning rules are as follows:

3.1 Random partitioning [DataStream → DataStream]

Random partitioning is used to randomly distribute data among all downstream partitions, which can be realized by shuffle method:

dataStream.shuffle();
Copy the code

Rebalancing [DataStream → DataStream]

Rebalancing is a polling method of partitioning data, which is suitable for rebalance situations where Rebalancing is required:

dataStream.rebalance();
Copy the code

3.3 Rescaling [DataStream → DataStream]

When Rebalancing a partition is done, the act of balancing a partition requires global load balancing, in which data is transferred to other nodes over the network and the partition is balanced. Rescaling is a low-rated version of rebalance. It requires no network overhead. It simply rebalance upstream and downstream operators using rescale:

dataStream.rescale();
Copy the code

ReScale the word ReScale means to ReScale, and so does the operation: If the upstream operation has a parallelism of 2 and the downstream operation has a parallelism of 6, then one of the upstream operations distributes elements to the three downstream operations, Another upstream operation distributes elements to the other three downstream operations. And vice versa, if the upstream operation has a parallelism of 6 and the downstream operation has a parallelism of 2, then three of the upstream operations will distribute elements to one downstream operation, The other three upstream operations distribute elements to the other downstream operation:

3.4 Broadcasting [DataStream → DataStream]

Distribute data to all partitions. In this case, the small data set can be broadcast to all partitions to avoid frequent cross-partition association. The broadcast method is used:

dataStream.broadcast();
Copy the code

3.5 Custom partitioning [DataStream → DataStream]

Flink users use their own partitioning rules to implement partitioning. In this case, they need to customize the partitioning rules and specify the partitioning keys by implementing the Partitioner interface, as shown in the following example:

 DataStreamSource<Tuple2<String, Integer>> streamSource = env.fromElements(new Tuple2<>("Hadoop".1),
                new Tuple2<>("Spark".1),
                new Tuple2<>("Flink-streaming".2),
                new Tuple2<>("Flink-batch".4),
                new Tuple2<>("Storm".4),
                new Tuple2<>("HBase".3));
streamSource.partitionCustom(new Partitioner<String>() {
    @Override
    public int partition(String key, int numPartitions) {
        // Assign Tuple2 whose first field contains flink to the same partition
        return key.toLowerCase().contains("flink")?0 : 1; }},0).print();


// Output is as follows:
1> (Flink-streaming,2)
1> (Flink-batch,4)
2> (Hadoop,1)
2> (Spark,1)
2> (Storm,4)
2> (HBase,3)
Copy the code

Task chain and resource group

Task chaining and resource groups are also low-level apis provided by Flink to control Task chains and resource allocation. By default, if operations are allowed (for example, two adjacent Map operations), Flink tries to run them in the same thread for better performance. But Flink also allows users to control these behaviors themselves, which is the task chain and resource group API:

4.1 startNewChain

StartNewChain is used to start a new task chain based on the current operation. As shown below, a new task chain is opened based on the first map, and the previous map and the next map are in the same new task chain, but they are in different task chains with the filter operation:

someStream.filter(...) .map(...) .startNewChain().map(...) ;Copy the code

4.2 disableChaining

The disableChaining operation is used to disable placing other operations in the same task chain as the current operation, as shown in the following example:

someStream.map(...) .disableChaining();Copy the code

4.3 slotSharingGroup

A slot is a fixed subset of the resources owned by a TaskManager. Each sub-task of an operation requires a slot to perform a computation. However, each operation requires different sizes of resources. Flink allows subtasks of different operations to be deployed into the same slot. SlotSharingGroup Is used to set the slot sharing group for operations. Flink puts operations with the same slot sharing group into the same slot. The following is an example:

someStream.filter(...) .slotSharingGroup("slotSharingGroupName");
Copy the code

The resources

Flink Operators: ci.apache.org/projects/fl…