preface

I did a review of what I wrote about Spark, and I don’t think there’s much to go on in the article if you can get through the basic concepts of Spark RDD, so I’ll skip to Flink first, and if there’s anything special I need to add, I’ll do it later.

Spark Streaming is technically a microbatch pseudo-real time approach, but Flink really does process it as it comes in, and while Spark Streaming and Kafka are integrated we need to manually manage offsets, whereas in Flink, It automatically helps us manage.

And the Flink operator is much richer than Spark Streaming, so Flink is the future, at least in my opinion

Flink, which grew out of a research project called Stratosphere to build the next generation of big data analytics platforms, became an Apache incubator project on April 16, 2014.

For the real-time program, we are concerned about the following three points, respectively, data input, data processing and data output. The image below is from the official Flink 1.9 website


The figure is clearly divided. There are two data sources for the input of the data in the front. One is brought by real-time Events, which belongs to the real-time direction, while the storage system of DataBase, FileSystem and KV in the bottom is the offline direction

Flink can be deployed in K8s, Yarn, Mesos, etc. The output may be an Application, or an Event log, such as Kafka, or it may be brought back to the persistence component mentioned above.

Apache Flink is a framework and distributed processing engine for stateful computation on both borderless and bounded data streams. Flink runs in all common cluster environments and can perform calculations at memory speed and at any scale.

Flink believes that data processing is stream processing, and data can be divided into bounded or unbounded.

  1. An unbounded flow defines the beginning of a flow, but does not define the end of a flow. They produce data endlessly. Unbounded stream data must be processed continuously, that is, immediately after it is ingested. We can’t wait for all the data to arrive before processing it, because the input is infinite and will never be complete at any point in time. Processing unbounded data often requires ingesting events in a particular order, such as the order in which they occurred, so that the integrity of the results can be inferred.

  2. A bounded flow has the beginning and end of a defined flow. Bounded flows can be calculated after all the data is ingested. Bounded streams all data can be sorted, so no sequential ingestion is required. Bounded stream processing is often referred to as batch processing

In the beginning, Spark only existed as RDD, but after Flink came out, Spark borrowed from the past. Flink started in real time, because it couldn’t do Spark offline at first. So they’re kind of in love with each other

1.1.2 Deployment Mode in Multiple Scenarios

Apache Flink it requires computing resources to execute the application. Flink integrates with all the usual cluster resource managers, such as Hadoop YARN, Apache Mesos, and Kubernetes, but can also run as a standalone cluster.

Flink is designed to work well with each of the above resource managers through a resource-manager-specific deployment pattern. Flink can interact in a way that is appropriate for the current resource manager.

When a Flink application is deployed, Flink automatically identifies the required resources based on the parallelism of the application configuration and requests these resources from the resource manager. In the event of a failure, Flink replaces the failed container by requesting a new resource. All communication for submitting or controlling an application is done through REST calls, which simplifies Flink’s integration with a variety of environments

1.1.3 Multiple scale applications of data

Flink is designed to run stateful streaming applications on any scale. As a result, applications are parallelized to possibly thousands of tasks, which are distributed across clusters and executed concurrently. So applications can take advantage of endless CPU, memory, disk, and network IO.

And Flink makes it easy to maintain very large application states. Its asynchronous and incremental checkpoint algorithms minimize the processing delay while ensuring the consistency of the exact primary state. Flink can do that

Handles trillions of events every day

Can maintain several terabytes of state

Clusters of thousands of nodes can be deployed

Copy the code

1.1.4 Make full use of memory performance

The stateful Flink program is optimized for local state access. The state of the task is always kept in memory, and if the size of the state exceeds the available memory, it is stored in an efficiently accessible disk data structure. Tasks do all their calculations by accessing local (usually in-memory) state, resulting in very low processing latency. Flink stores local state periodically and asynchronously to ensure state consistency in fault scenarios.

We make real-time statistics on the number of words appearing in the last 2 seconds every second. IDEA is recommended for development

1.3.1 Adding POM Dependencies

<properties>

    <flink.version>1.9.0</flink.version>

    <scala.version>2.11.8</scala.version>

</properties>



<dependencies>

    <dependency>

    <groupId>org.apache.flink</groupId>

    <artifactId>Flink - streaming - java_2. 11</artifactId>

    <version>${flink.version}</version>

    </dependency>

</dependencies>

Copy the code

1.3.2 Java code development

We’re using Java for development here, and Scala is fine, but Java is used in most of the enterprise

When typing code, be careful not to guide Scala packages

public class WindowWordCountJava {

    public static void main(String[] args) throws Exception {

        // Utility class provided by Flink to get passed parameters

        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        String hostname = parameterTool.get("hostname");

        int port = parameterTool.getInt("port");



        // Step 1: Obtain the execution environment

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Step 2: Get the data source

        DataStream<String> dataStream = env.socketTextStream(hostname, port);

        // Step 3: Perform the logical operation

        DataStream<WordCount> wordAndOneStream = dataStream.flatMap(new FlatMapFunction<String, WordCount>() {

            public void flatMap(String line, Collector<WordCount> out{

                String[] fields = line.split(",");

                for (String word : fields) {

                    out.collect(new WordCount(word, 1L));

                }

            }

        });



        // Similar to the window operator in Spark Streaming

        DataStream<WordCount> resultStream = wordAndOneStream.keyBy("word")

                .timeWindow(Time.seconds(2), Time.seconds(1))// Calculate the latest 2 seconds every 1 second

                .sum("count");

        // Step 4: Print the result

        resultStream.print();

        // Step 5: Start the task

        env.execute("WindowWordCountJava");

    }



    public static class WordCount{

        public String word;

        public long count;

        // Remember to have the empty constructor

        public WordCount(){



        }

        public WordCount(String word,long count){

            this.word = word;

            this.count = count;

        }



        @Override

        public String toString(
{

            return "WordCount{" +

                    "word='" + word + '\' ' +

                    ", count=" + count +

                    '} ';

        }

    }

}

Copy the code

Scala’s version

Also add Flink’s Scala dependencies, Scala development dependencies and compiled plug-ins

<dependency>

        <groupId>org.apache.flink</groupId>

        <artifactId>Flink - streaming - scala_2. 11</artifactId>

        <version>${flink.version}</version>

Copy the code

1.9.0 2.11.8

<dependencies>

    <dependency>

        <groupId>org.apache.flink</groupId>

        <artifactId>Flink - streaming - java_2. 11</artifactId>

        <version>${flink.version}</version>

    </dependency>



    <dependency>

        <groupId>org.apache.flink</groupId>

        <artifactId>Flink - streaming - scala_2. 11</artifactId>

        <version>${flink.version}</version>

    </dependency>

</dependencies>



<build>

    <pluginManagement>

        <plugins>

            <! -- Scala plugin -->

            <plugin>

                <groupId>net.alchim31.maven</groupId>

                <artifactId>scala-maven-plugin</artifactId>

                <version>3.2.2</version>

            </plugin>

            <! -- Maven plugin -->

            <plugin>

                <groupId>org.apache.maven.plugins</groupId>

                <artifactId>maven-compiler-plugin</artifactId>

                <version>3.5.1 track of</version>

                <configuration>

                    <source>1.8</source>

                    <target>1.8</target>

                </configuration>

            </plugin>

        </plugins>

    </pluginManagement>

    <plugins>

        <plugin>

            <groupId>net.alchim31.maven</groupId>

            <artifactId>scala-maven-plugin</artifactId>

            <executions>

                <execution>

                    <id>scala-compile-first</id>

                    <phase>process-resources</phase>

                    <goals>

                        <goal>add-source</goal>

                        <goal>compile</goal>

                    </goals>

                </execution>

                <execution>

                    <id>scala-test-compile</id>

                    <phase>process-test-resources</phase>

                    <goals>

                        <goal>testCompile</goal>

                    </goals>

                </execution>

            </executions>

        </plugin>



        <plugin>

            <groupId>org.apache.maven.plugins</groupId>

            <artifactId>maven-compiler-plugin</artifactId>

            <executions>

                <execution>

                    <phase>compile</phase>

                    <goals>

                        <goal>compile</goal>

                    </goals>

                </execution>

            </executions>

        </plugin>



        <plugin>

            <groupId>org.apache.maven.plugins</groupId>

            <artifactId>maven-shade-plugin</artifactId>

            <version>2.4.3</version>

            <executions>

                <execution>

                    <phase>package</phase>

                    <goals>

                        <goal>shade</goal>

                    </goals>

                    <configuration>

                        <filters>

                            <filter>

                                <artifact>* : *</artifact>

                                <excludes>

                                    <exclude>META-INF/*.SF</exclude>

                                    <exclude>META-INF/*.DSA</exclude>

                                    <exclude>META-INF/*.RSA</exclude>

                                </excludes>

                            </filter>

                        </filters>

                    </configuration>

                </execution>

            </executions>

        </plugin>

    </plugins>

</build>

Copy the code

Scala overrides the logic just described

import org.apache.flink.api.java.utils.ParameterTool

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import org.apache.flink.streaming.api.windowing.time.Time



/ * *

* Sliding window

* Every 1 second statistics of the latest 2 seconds of data, print to the console.

* /


object WindowWordCountScala {

  def main(args: Array[String]): Unit = {

    // Get parameters

    val hostname = ParameterTool.fromArgs(args).get("hostname")

    val port = ParameterTool.fromArgs(args).getInt("port")

    //TODO imports implicit conversions

    import org.apache.flink.api.scala._

    // Step 1: Obtain the execution environment

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // Step 2: Get the data source

    val textStream = env.socketTextStream(hostname,port)

    // Step 3: Data processing

    val wordCountStream = textStream.flatMap(line => line.split(","))

      .map((_, 1))

      .keyBy(0)

      .timeWindow(Time.seconds(2), Time.seconds(1))

      .sum(1)

    // Step 4: Data result processing

    wordCountStream.print()

    // Step 6: Start the program

    env.execute("WindowWordCountScala")

  }

}

Copy the code

1.4 Installation in Local Mode

  1. Install JDK and configure JAVA_HOME. Jdk1.8 or higher is recommended

  2. The installation package download address: http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz

  3. Directly upload the installation package to the server

    Decompress the installation package: tar -zxvf flink-1.9.1-bin-scala_2.11.tgz

    Create shortcut: ln -s flink-1.9.1-bin-scala_2.11.tgz flink

    Configure the FLINK_HOEM environment variable

  4. Start the service

    Local mode, do not need to configure any configuration items, directly start the server

    cd $FLIKE_HOME

    /bin/start-cluster.sh Starts the service

    /bin/stop-cluster.sh Stops services

  5. Web page localhost:8081

1.5 Standalone mode Installation

1.5.1 Cluster Planning

The host name JobManager TaskManager
hadoop01 is
hadoop02 is
hadoop03 is

1.5.2 rely on

If the value is higher than JDK1.8, configure JAVA_HOME

No password is required between hosts

Flink – 1.9.1 – bin – scala_2. 11. TGZ

1.5.3 Installation Procedure

  1. Modify the conf/flink – the conf. Yaml

    jobmanager.rpc.address: hadoop01

  2. Modify the conf/slaves

    hadoop02

    hadoop03

  3. Copy to other nodes

    SCP -rq /usr/local/flink-1.9.1 hadoop02:/usr/local SCP -rq /usr/local/flink-1.9.1 hadoop03:/usr/local

  4. Start on the Hadoop01 (JobMananger) node

    start-cluster.sh

  5. Go to http://hadoop01:8081

1.5.4 Parameters to consider for StandAlone mode

  1. Jobmanager.heap. MB – Specifies the available memory size of the JobManager node

  2. Taskmanager.heap. MB – Specifies the available memory size of the TaskManager node

  3. The taskmanager. NumberOfTaskSlots – the amount of CPU each machine available

  4. Parallelism. Default – The degree of parallelism of tasks by default

  5. Taskmanager.tmp. dirs — Temporary data store directory for TaskManager

1.6 Flink on Yarn Installation Mode

There are two modes of flink on YARN:

1.6.1 The first method

Start a Flink cluster in YARN and submit tasks to the Flink cluster. Resources will not be released unless the Flink cluster is stopped. This method wastes resources

1.6.2 Second method

Start a Flink cluster (recommended) on YARN every time a task is submitted. Resources are automatically released when the task is complete

1.7 Task submission in different modes

1.7.1 Yarn-session. sh(Creating Resources)+ Flink run(Submitting tasks)

This method is rarely used, so it is not clear

Start a flink cluster that is always running



/bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 [-d]



Attach the task to an existing Flink YARN Session



•. / bin/yarn - session. Sh - id application_1463870264508_0029



• Perform tasks



•./bin/flink run ./examples/batch/WordCount.jar -input hdfs://hadoop100:9000/LICENSE -output hdfs://hadoop100:9000/wordcount-result.txt 



Stop the task [Run the cancel command on the Webui or cli]

Copy the code

1.7.2 flink run -m yarn-cluster(Creating resources + submitting a Task)

Start the cluster and perform tasks



./bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar

Copy the code

Yjm is Yarn JobManager, yTM is Yarn taskManager, and other related parameters can be found on Baidu

Options for yarn-cluster mode:

 -d,-- Detached runs tasks in standalone mode

 -m,-- jobManager Address to connect to jobManager (master).

Use this flag to connect to a different JobManager as specified in the configuration

-yd <property=value> Uses the value of the given attribute

 -yd,-- Yarndetached Run tasks in standalone mode (expired; Use the non-YARN option instead.)

 -yh,-- YarnHelp Yarn Session HELP information about the CLI

 -yid,--yarnapplicationId The ID used to run the YARN Session

 -yj,--yarnjar Path to the Flink jar file

 -yjm,--yarnjobManagerMemory Optional unit of memory for the JobManager container (default: MB)

 -yn,-- yarnContainer Allocate the number of YARN containers (= the number of TaskManagers)

 -ynm,-- yarnName gives the application a custom name to display on YARN

 -yq,-- yarnQuery Displays available YARN resources (memory, queues)

 -yqu,-- yarnQueue Specifies the YARN queue

 -ys,--yarnslots The number of slots per TaskManager

 -yst,-- Yarnstreaming starts Flink with streaming processing

 -yt,-- Yarnship transfers files in the specified directory

                                      (t for transfer)

 -ytm,--yarntaskManagerMemory Optional unit of memory for each TaskManager container (default: MB)

 -yz,--yarnzookeeperNamespace The namespace used to create the subpath of Zookeeper in high availability mode.

 -ynl,--yarnnodeLabel Specifies the YARN node label of the YARN application

Copy the code

Note: The client must set the YARN_CONF_DIR, HADOOP_CONF_DIR, or HADOOP_HOME environment variable to read YARN and HDFS configuration information. Otherwise, the startup fails

Finally

The next article starts with operators