1. Flink environment construction

1.1 Flink Version List:

archive.apache.org/dist/flink/

1.2 Install the latest version 1.12.2

Wget HTTP: / / https://archive.apache.org/dist/flink/flink-1.12.2/flink-1.12.2-bin-scala_2.12.tgzCopy the code

1.3 Decompressing installation

The tar - XZF flink 1.12.2 - bin - scala_2. 12. TGZ. / bin/start - cluster. ShCopy the code

Check whether the installation was successful: the JPS – l | grep flink

Web UI page address: http://192.168.9.226:8081/#/overview

2. The wordCount example

2.1 Directory structure of springboot project:

2.2 Adding maven dependencies:

<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId>  </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> < version > 1.10.0 < / version > < / dependency > < the dependency > < groupId > org. Apache. Flink < / groupId > < artifactId > flink - streaming - java_2. 11 < / artifactId > < version > 1.10.0 < / version > < / dependency > < the dependency > <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> < artifactId > maven -- the compiler plugin < / artifactId > < version > 3.6.0 < / version > < configuration > < source > 1.8 < / source > <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> < artifactId > maven - surefire plugin < / artifactId > < version > 2.19 < / version > < configuration > < the skip > true < / skip > </configuration> </plugin> </plugins> </build>Copy the code

2.3 Example 1: Batch wordCount

import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; // Batch wordCount public class WordCountBatch {public static void main(String[] args) throws Exception{// Create an execution environment ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); String filePath = "F:\\ttWork\\flink-demo\\ SRC \\main\ resources\ hell.txt "; DataSet<String> inputDateSet = env.readTextFile(filePath); // DataSet<Tuple2<String, Integer>> sum = inputDatesett.flatMap (new MyFlatMap()).groupby (0) // Sum (1); // DataSet<Tuple2<String, Integer>> sum = inputDateSet. Sum.print (); Public static class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>>{ @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> Collector) throws Exception {// Use the blank word String[] words = s.split(" "); For (String word: words){collector.collect(new Tuple2<>(word, 1)); }}}}Copy the code

2.4 Example 2: Stream processing wordCount

import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; Public class WordCountStream {public static void main(String[] args) throws Exception {// Create an execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Set the parallelism to env.setParallelism(4); String filePath = "F:\\ttWork\\flink-demo\\ SRC \\main\ resources\ hell.txt "; DataStream<String> inputDataStream = env.readTextFile(filePath); / / data flow conversion operations SingleOutputStreamOperator < Tuple2 < String, Integer>> sum = inputDataStream.flatMap(new MyFlatMap()) .keyBy(0) .sum(1); sum.print(); // Start task env.execute(); Public static class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> Collector) throws Exception {// Use the blank word String[] words = s.split(" "); For (String word: words){collector.collect(new Tuple2<>(word, 1)); }}}}Copy the code

2.5 Example 3: Socket flow processing wordCount

Socket Port input test data

import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; WordCount public class WordCountSocketStream {public static void main(String[] args) throws Exception { / / create the execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment. GetExecutionEnvironment (); // env.setParallelism(4); ParameterTool parameterTool = ParameterTool.fromArgs(args); String host = parameterTool.get("host"); int port = parameterTool.getInt("port"); // Read data from the socket text stream DataStream<String> inputDataStream = env.sockettextStream (host, port); / / data flow conversion operations SingleOutputStreamOperator < Tuple2 < String, Integer>> sum = inputDataStream.flatMap(new MyFlatMap()) .keyBy(0) .sum(1); sum.print(); // Start task env.execute(); Public static class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> Collector) throws Exception {// Use the blank word String[] words = s.split(" "); For (String word: words){collector.collect(new Tuple2<>(word, 1)); }}}}Copy the code

2.6 Send Example three to the Flink server using flink Web UI

Enter the class to run, the socket address, and the port:

Check the execution of Flink program on the Running Jobs page, such as the parallelism of each operator, the number of accepted data, etc

View Flink’s log output on the Task Managers page: