Author: Chong Wu

In this article, we’ll teach you how to build your first Apache Flink (hereafter Flink) application from scratch.

Preparing for development Environment

Flink can run on Linux, Max OS X, or Windows. To develop Flink applications, you need a Java 8.x and Maven environment on your local machine.

If you have a Java 8 environment, running the following command will output the following version information:

$ java -version
java version "1.8.0 comes with _65"
Java(TM) SE Runtime Environment (build 1.8.0_65-b17)
Java HotSpot(TM) 64-Bit Server VM (build 25.65-b01, mixed mode)
Copy the code

If you have maven, run the following command to output the following version information:

$MVN - version Apache Maven 3.5.4 (1 edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00) Maven Home: /Users/wuchong/dev/ Maven Java version: 1.8.0_65, vendor: Oracle Corporation, the runtime: / Library/Java/JavaVirtualMachines jdk1.8.0 _65. JDK/Contents/Home/jre Default locale: zh_CN, platform encoding: UTF-8 OS name:"mac os x", version: "10.13.6", arch: "x86_64", family: "mac"
Copy the code

In addition, we recommend ItelliJ IDEA (the community free version is sufficient) as the development IDE for Flink applications. Eclipse could work, but it has known problems with Scala and Java hybrid projects, so it’s not recommended. In the next section, we’ll show you how to create a Flink project and import it into ItelliJ IDEA.

Creating a Maven project

We will use Flink Maven Archetype to create our project structure and some initial default dependencies. In your working directory, run the following command to create the project:

mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ -darchetypeVersion =1.6.1 \ -dgroupid =my-flink-project \ -dartifactid =my-flink-project \ -dversion =0.1 \ -Dpackage=myflink \ -DinteractiveMode=false
Copy the code

You can edit the above groupId, artifactId, package to your preferred path. Using the above parameters, Maven will automatically create the following project structure for you:

$tree my - flink - project my - flink - project ├ ─ ─ pom. The XML └ ─ ─ the SRC └ ─ ─ the main ├ ─ ─ Java │ └ ─ ─ myflink │ ├ ─ ─ BatchJob. Java │ └ ─ ─ StreamingJob. Java └ ─ ─ resources └ ─ ─log4j.properties
Copy the code

Our pom.xml file already contains the required Flink dependencies, and there are several sample program frameworks under SRC /main/ Java. Next we’ll start writing our first Flink program.

Write the Flink program

Start IntelliJ IDEA, select “Import Project”, select POM.xml under the root directory of my-flink-project. Follow the instructions to complete the project import.

In the SRC/main/Java/myflink created under SocketWindowWordCount. Java file:

package myflink;

public class SocketWindowWordCount {

    public static void main(String[] args) throws Exception {}}Copy the code

It’s pretty basic right now, and we’ll fill it in step by step. Note that we will not include import statements as well, because the IDE adds them automatically. I’ll show you the complete code at the end of this section, but if you want to skip the next step, you can just paste the final complete code into the editor.

Flink program of the first step is to create a StreamExecutionEnvironment. This is an entry class that can be used to set parameters and create data sources and submit tasks. So let’s add it to main:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Copy the code

Next we will create a data source to read data from the socket on local port 9000:

DataStream text = env.socketTextStream("localhost".9000."\n");
Copy the code

This creates a DataStream of type string. DataStream is the core API for streaming in Flink, which defines a number of common operations (filtering, transformation, aggregation, windowing, association, and so on). In this example, we are interested in the number of times each word appears in a particular time window, such as a 5-second window. To do this, we first parse the String data into words and times (represented by Tuple2

), with the first field being the word and the second field being the number of times, both of which are initially set to 1. We implemented a flatMap to do the parsing because there could be multiple words in a single line of data.
,>

DataStream> wordCounts = text
                .flatMap(new FlatMapFunction>() {
                    @Override
                    public void flatMap(String value, Collector> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(Tuple2.of(word, 1)); }}});Copy the code

We then group the data streams by word fields (index 0). We can simply use the keyBy(int index) method to get a Tuple2

data stream with the word as key. We can then specify the desired window on the stream and calculate the result based on the data in the window. In our example, we want to aggregate the number of words every 5 seconds, with each window counting from zero:.
,>

DataStream> windowCounts = wordCounts
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);
Copy the code

The second call to.timeWindow() specifies that we want a 5-second Tumble window. The third call specifies the sum aggregate function for each window of each key, in our case by the number of times field (that is, the number 1 index field). The resulting data stream will output the number of occurrences of each word in these 5 seconds every 5 seconds.

The last thing to do is print the data stream to the console and start executing:

windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
Copy the code

The final env.execute call is required to start the actual Flink job. All operator operations (such as create source, aggregate, print) are just graphs of the internal operator operations. Execute on commit to cluster or local machine only if execute() is called.

Here’s the complete code, some of it simplified (the code is also available on GitHub) :

package myflink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class SocketWindowWordCount {

    public static void main(String[] args) throws Exception {

        // Create an execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Get input data through the socket. This socket is connected to the local 9000 port. If the 9000 port has been used, please change it to another port
        DataStream text = env.socketTextStream("localhost".9000."\n");

        // Parse data, group data by word, open Windows, aggregate data
        DataStream> windowCounts = text
                .flatMap(new FlatMapFunction>() {
                    @Override
                    public void flatMap(String value, Collector> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(Tuple2.of(word, 1));
                        }
                    }
                })
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

        // Print the result to the console. Note that single-threaded printing is used, not multithreaded
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount"); }}Copy the code

To run the program

To run the sample program, first we start Netcat at the terminal to get the input stream:

nc -lk 9000
Copy the code

For Windows, you can install ncat at nmap.org/ncat/ and run:

ncat -lk 9000
Copy the code

Then run the main method of SocketWindowWordCount directly.

Just type the word in the Netcat console and you can see the word frequency statistics for each word in the Output console of SocketWindowWordCount. If you want to see a count greater than 1, type the same word repeatedly within 5 seconds.

For more information, please visit the Apache Flink Chinese community website