What is WordCount?

Wordcount is simply wordcount. Wordcount is an introduction to the general big data computing framework (Hadoop, Spark, Flink), which is equivalent to HelloWorld in programming languages (Java, Python). It is suitable for students who are just starting to understand Flink assignment submission process.

Environmental requirements
  1. JDK 1.8 (required)
~ $Java -version Java version "1.8.0_291" Java(TM) SE Runtime Environment (build 1.8.0_291-B10) Java HotSpot(TM) 64-bit Server VM (Build 25.291-B10, Mixed mode)Copy the code
  1. Flink 1.12.1 (Required)
  • Install it using a tar package. The download address is as follows:

Archive.apache.org/dist/flink/…

  • Decompress the installation package
~/flink-dev $tar -zxvf flink-1.12.1-bin-scala_2.11.tgz ~/flink-dev $ls -l total 655424 drwxr-xr-x@ 13 it staff 416 1 10 08:46 flink-1.12.1-rw-r --r--@ 1 IT staff 334271560 6 18 00:18 flink-1.12.1-bin-scala_2.11.tgzCopy the code
  • Viewing the directory Structure

  • Configuring environment Variables
~ / flink - dev/flink - 1.12.1 $pwd/ XXX /flink-dev/flink-1.12.1 ~/flink-dev/flink-1.12.1 $vi ~/.zshrc# Add the following two lines to the above file
# since I am using ZSH here, I choose files according to my needs
# MAC defaults to.bash_profile
exportFLINK_HOME = / XXX/flink - dev/flink - 1.12.1export PATH=$PATH:$FLINK_HOME/bin
Run the following command after the save
source ~/.zshrc
Copy the code
  • Verify the version
~/flink-dev/flink-1.12.1 $flink --version version: 1.12.1, Commit ID: dc404e2Copy the code

If the preceding information is displayed, the installation is successful.

Start the cluster

Those of you who have learned some Java should know that the startup script is usually in the bin directory

  • Run the start cluster command
~/flink-dev/flink-1.12.1/bin $./start-cluster.sh Starting cluster. Starting standalonesession daemon on host MacBook-Pro.lan. Starting taskexecutor daemon on host MacBook-Pro.lan.Copy the code

— Access the Flink webUI (localhost:8081)

If the following page is displayed after you enter the IP address, the cluster is successfully started.

Write the WordCount project

Flink started a cluster locally, and the next step was to submit our task, which was written using the Java language calling Flink’s API.

  • Create a project

  • Copy the following configuration to the pom.xml file
<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.12.1</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.15</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>
Copy the code
  • coding

Here we modify it a little bit. We use Netcat to stream data to a port so that we can simulate an endless stream of data, let wordCount listen to the port, count words, and output the final result to log. The code is as follows:

package com.bruce.wordcount;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WordCount {

    static Logger logger = LoggerFactory.getLogger(WordCount.class);


    /**
     * String --> Tuple2<String, Integer>
     *
     * Implements the string tokenizer that splits sentences into words as a user-defined
     * FlatMapFunction. The function takes a line (String) and splits it into multiple pairs in the
     * form of "(word,1)" ({@code Tuple2<String, Integer>}).
     */
    public static final class Tokenizer
            implements FlatMapFunction<String.Tuple2<String.Integer>> {

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");

            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1)); }}}}public static void main(String[] args) throws Exception {
        // Parameter parsing
        if(args.length ! =2) {
            logger.error("Usage: \n");
            logger.error("Please input host and port.");
            return;
        }
        String host = args[0];
        int port = Integer.parseInt(args[1]);

        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // source
        DataStream<String> source = env.addSource(new SocketTextStreamFunction(host, port, "\n".0)).name("Source");

        // transform
        DataStream<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                source.flatMap(new Tokenizer())
                        // group by the tuple field "0" and sum up tuple field "1"
                        .keyBy(value -> value.f0)
                        .sum(1).name("Transform");

        // sink = log
        // To show the effect, output the result directly to log
        counts.addSink(new WordCountSink()).name("Sink");

        // execute program
        env.execute("WordCount from socket by bruce."); }}Copy the code

Sink code of transformation:

package com.bruce.wordcount;


import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WordCountSink implements SinkFunction<Tuple2<String.Integer>> {
    private static final long serialVersionUID = 1L;
    private static final Logger logger = LoggerFactory.getLogger(WordCountSink.class);

    @Override
    public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
        logger.info("{ Word: \""+ value.f0 + "\", Cnt:" + value.f1 +"}"); }}Copy the code
  • packaging

Go to the project directory and use the MVN command to package

mvn clean package -Dmaven.test.skip=true
Copy the code

  • Enabling a Listening port

Before submitting the task, enable the listening port. Otherwise, a link failure error will be reported. Open another terminal and run the following command:

nc -l 10000

Copy the code

Notice that the window is blocked here

  • Submit a task

Go to the following directory

~ / flink - dev/flink - 1.12.1 / binCopy the code

Execute commit command (absolute path is preferred for jar package path)

Flink run - c com. Wordcount. Wordcount/XXX/IdeaProjects FlinkPractice/target/FlinkPractice - 1.0 - the SNAPSHOT. Jar localhost 10000Copy the code

  • View the task

The real-time stream processing task must always be running because of the constant flow of data to process.

  • The input data

Type some words into a blocking window and a carriage return is sent.

  • Viewing Task Logs

If logs are displayed in Task Manager, statistics are collected successfully.

Appendix: The complete code has been posted on Github

Github.com/BruceWong96…

More exciting: concern public number: programmer big devil king