Recently I plan to study Flink and write Hello,World according to the official document. Getting started is relatively easy and does not require a complex installation environment or configuration. This article briefly introduces the use of Flink and the introduction.

feeling

  • Easy to build environment: Flink can be run and developed under Windows. For those who prefer to develop under Windows, the cost of building a VIRTUAL machine can be eliminated. And do not rely on other frameworks, the local environment is simple to build. This is crucial, and many people give up on setting up their learning framework. Reduce the cost of setting up the environment, so that beginners can avoid wasting too much energy. Hadoop was cumbersome to framework, and in the early days it only ran on Linux.
  • Detailed documentation: Flink’s official website is very detailed, which steps will be involved in the development process, as well as the operation path of each step, Flink’s official website is detailed. This includes importing Flink source code into IDEA, which solves a major pain point for anyone who wants to read the source code.
  • Chinese document: The official website of Flink already has a Chinese page, although there are few Chinese pages at present, it should be under translation. This shows that the Flink community attaches great importance to domestic developers.
  • Not relying on Hadoop: This is a good thing for a completely new framework that has no historical baggage. And it can be independently deployed and developed by those who learn it, without the need for a background in other frameworks.
  • Increasing attention: I searched Flink in wechat and found that most of the articles were written in 18 or 19 years, indicating that Flink’s attention is gradually increasing. Some big companies have also started to use Flink to build real-time data warehouses, such as Alibaba.

Flink is committed to providing developers with a convenient, easy-to-use programming framework. At the same time, the community is very focused on the documentation of detailed procedures and ease of use by developers.

The next step is to set up the Flink environment and run WordCount.

Run locally

Flink runs on Linux, Mac OS X, and Windows environments. I like developing under Windows, so I run Flink on Windows. The latest version of Flink (1.8.0) requires JDK version 1.8 or higher. Launching Flink locally is very easy, you download the Flink binary package, you need to select the version of Scala, and it doesn’t matter which version you choose if you don’t use Scala to develop Flink applications. FLink-1.8.0-bin-scala_2.11.tgz The startup steps are as follows:

cdFlink - 1.8.0 comes with# Decompressed directory
cd bin
start-cluster.bat # Start local FlinkCopy the code

When you start, you will find that two Windows for Java programs pop up. One is JobManager and the other is TaskManater. Visit the Flink Web page at http://localhost:8081, which is used to view the operating environment and resources, submit, and monitor Flink jobs.

WordCount

Get a feel for how to write a Flink application with simple WordCount. Flink already provides templates for generating Maven projects

# Maven project using Javamvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ - DarchetypeVersion = 1.8.0 comes with# Using The Scala Maven projectmvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-scala \ - DarchetypeVersion = 1.8.0 comes withCopy the code

If you do not want to generate a Maven project through the cli, you can create a Flink application template project in IDEA as follows. Java is used as an example

Click “Add Archetype…” on the page above. And then fill in the following content in the pop-up dialog box

Choose the Archetype we added to continue building the Maven project. Besides Maven projects, you can also create Gradle and Sbt projects.

To get the Flink application running quickly, we can copy the WordCount example from the official website directly to our own project. The Java code is as follows

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class FirstCase {
    public static void main(String[] args) throws Exception {

        // the port to connect to
        final int port = 9000;

        // get the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data by connecting to the socket
        DataStream<String> text = env.socketTextStream("192.168.29.132", port, "\n");

        // parse the data, group it, window it, and aggregate the counts
        DataStream<WordWithCount> windowCounts = text
                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    @Override
                    public void flatMap(String value, Collector<WordWithCount> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                })
                .keyBy("word")
                .timeWindow(Time.seconds(5), Time.seconds(1))
                .reduce(new ReduceFunction<WordWithCount>() {
                    @Override
                    public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                        returnnew WordWithCount(a.word, a.count + b.count); }}); //print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");
    }

    // Data type for words with count
    public static class WordWithCount {

        public String word;
        public long count;

        public WordWithCount() {}

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + ":"+ count; }}}Copy the code

I’m not familiar with the Flink programming model, but I can pretty much infer the meaning of each step from the above code. Since we’re getting started with Flink, there’s no need to worry too much about the code itself at first. Run the Demo first and slowly learn in depth. Now the statistics program is in place, but the data source is missing. The example on the website uses Netcat, which I have installed on Windows but find inconvenient to use. In Linux virtual machine installed a, so use with the official website consistent. My VM runs Centos 7 64-bit. The installation command is as follows

yum install nmap-ncat.x86_64Copy the code

Start Netcat to send data

nc -l 9000Copy the code

The next step is to start the Flink application to connect to the data source and do statistics. Before starting, you need to replace the IP and port in the following code with your own

DataStream<String> text = env.socketTextStream("192.168.29.132", port, "\n");Copy the code

There are two ways to start Flink applications. One is to run Java programs directly in IDEA. Another option is to use Maven to create a JAR package and submit it to the Flink cluster for execution. The second command is as follows

$FLINK_HOME\bin\flink run  $APP_HOMEJar FLINK_HOME is the directory of flink binary package. APP_HOME is the directory of maven project created aboveCopy the code

After starting the Flink application, we can enter text into Netcat and observe Flink statistics

$ nc -l 9000
a aCopy the code

We only sent one line saying “A A”. If you start the application in IDEA, you can see the output directly in the IDEA console. If you start the application in Flink Run, you need to view the output in the TaskManager window. The output is as follows

a : 2
a : 2
a : 2
a : 2
a : 2Copy the code

Why did it print 5 times? Take a look at our application and we have this sentence

.timeWindow(Time.seconds(5), Time.seconds(1))Copy the code

It represents that the Flink application processes the data window for 5s at a time, and after processing, the entire window slides forward for 1s. That is, the data processed each time is “last 5 seconds” data. Because there is only one record “A A” in the recent 5S data source, it is output 5 times.

So that’s WordCount for Java. Of course, we can also write in Scala, which is more concise and requires less code.

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object SocketWindowWordCount {
  def main(args: Array[String]) : Unit = {
    // get the execution environment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // get input data by connecting to the socket
    val text = env.socketTextStream("192.168.29.132", 9000, '\n')

    // parse the data, group it, window it, and aggregate the counts
    val windowCounts = text
      .flatMap { w => w.split("\\s") }
      .map { w => WordWithCount(w, 1) }
      .keyBy("word")
      .timeWindow(Time.seconds(5), Time.seconds(1))
      .sum("count") / /print the results with a single thread, rather than in parallel
    windowCounts.print().setParallelism(1)

    env.execute("Socket Window WordCount")
  }

  // Data type for words with count
  case class WordWithCount(word: String, count: Long)
}Copy the code

It’s about half the code in Java. I feel that Scala is quite suitable for big data statistics code, although Scala has a high threshold. Scala programs run just like Java. If any of the following errors occur, you need to check to see if the import statement is not written correctly

Error:(29, 16) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String]
      .flatMap { w => w.split("\\s")}Copy the code

The solution

import org.apache.flink.streaming.api.scala._Copy the code

conclusion

So that’s a brief introduction to Flink. Continue to look at the Flink framework.

Welcome to the public account “Du Code”