1.Flink Knowledge summary

1.1. Flink component stack

Apache Flink- Stateful computing on data streams.

Component stack

Detailed introduction of each layer:

  • Physical deployment layer: Flink can be deployed locally, in an independent cluster or a cluster managed by YARN, or in the cloud. This layer mainly involves the deployment mode of Flink. Currently, Flink supports the following deployment modes: Local, cluster (Standalone, YARN), Cloud (GCE/EC2), Kubenetes. Flink can support the deployment of different platforms through this layer. Users can select the corresponding deployment mode as required.
  • Runtime core layer: The Runtime layer provides all the core implementations that support Flink computing and provides basic services for the upper API layer, which is mainly responsible for providing basic services for different interfaces of the upper layer. It is also the core implementation layer of Flink distributed computing framework. Support for distributed Stream job execution, JobGraph to ExecutionGraph mapping transformation, task scheduling, etc. DataSteam and DataSet are converted into unified executable Task Operator, so as to process batch calculation and streaming calculation simultaneously under streaming engine.
  • API&Libraries layer: Flink first supports The Scala and Java apis. Python is also being tested. DataStream, DataSet, Table, SQL API. As a distributed data processing framework, Flink provides interfaces to support both computing and batch computing. Both of them provide users with rich high-level data processing apis, such as Map and FlatMap operations. A lower-level Process Function API is also provided, allowing users to directly manipulate low-level data such as state and time.
  • Extended libraries: Flink also includes CEP for complex event processing, machine learning library FlinkML, graph processing library Gelly, and more. Table is an interface SQL support, that is, API support (DSL), rather than textual SQL parsing and execution.

1.2. Flink cornerstone

Flink’s popularity can be attributed to its four most important cornerstones: Checkpoint, State, Time, and Window.

  • Checkpoint

    This is one of Flink’s most important features.

    Flink implements a distributed consistent snapshot based on chandy-Lamport algorithm, thus providing consistent semantics.

    The Chandy-Lamport algorithm was actually proposed in 1985, but it was not widely used, while Flink carried it forward.

    Spark has recently implemented Continue Streaming. Continue Streaming is designed to reduce processing latency. It also needs to provide this consistent semantics, and finally uses the Chandy-Lamport algorithm. The Chandy-Lamport algorithm has been recognized in the industry.

  • State

    After providing consistent semantics, Flink also provides a set of very simple and clear State apis, including ValueState, ListState, MapState, etc., in order to make it easier and easier for users to manage State during programming. With the recent addition of BroadcastState, this consistent semantics is automatically enjoyed using the State API.

  • Time

    In addition, Flink implemented the Watermark mechanism to support event-based time processing and tolerate late/out-of-order data.

  • Window

    In addition, in the flow calculation, the window will be opened before the operation of the convection data, that is, based on what kind of window to do this calculation. Flink provides a variety of Windows out of the box, such as sliding Windows, rolling Windows, session Windows, and very flexible custom Windows.

1.3.Flink installation and deployment

Flink supports multiple installation modes

– Local – Indicates the Local single-machine deployment mode. This parameter is used during the learning test

– Standalone – Standalone cluster mode. Flink has its own cluster and is used by the development test environment

– StandaloneHA – Independent cluster ha mode. Flink has its own cluster and is used for development and test environments

– On Yarn – Computing resources are managed by Hadoop Yarn and used in the production environment

1.3.1.Local Local mode

The principle of

  1. The Flink program is submitted by JobClient
  2. The JobClient submits the job to the JobManager
  3. The JobManager coordinates resource allocation and job execution. After resource allocation is complete, the task is submitted to the appropriate TaskManager
  4. TaskManager starts a thread to start execution. TaskManager reports status changes, such as started, in progress, or completed, to JobManager.
  5. When the job is executed, the results are sent back to the JobClient

Standalone Standalone cluster mode

The principle of

  1. Client The client submits the task to the JobManager
  2. The JobManager is responsible for requesting the resources required for the task to run and managing the tasks and resources
  3. JobManager assigns tasks to TaskManager for execution
  4. TaskManager regularly reports status to JobManager

Standalone-HA high availability cluster mode

From the previous architecture, we can clearly find that JobManager has an obvious SPOF (single point of failure). JobManager is responsible for task scheduling and resource allocation. Once JobManager has an accident, the consequences can be imagined.

With the help of Zookeeper, a Standalone Flink cluster will have multiple jobManagers alive at the same time, only one of which is in the working state and the others are in the Standby state. When a working JobManager is disconnected (such as down or Crash), Zookeeper selects a new JobManager from the Standby to take over the Flink cluster.

1.3.1.Flink On Yarn mode

The principle of

Why use Flink On Yarn?

  1. Yarn resources can be used as required to improve cluster resource utilization
  2. A Yarn task has a priority, and the job is run based on the priority
  3. Based on the Yarn scheduling system, Failover of each role can be automatically handled (fault-tolerant)
    1. The JobManager and TaskManager processes are monitored by Yarn NodeManager
    2. If the JobManager process exits unexpectedly, Yarn ResourceManager reschedles the JobManager process to other machines
    3. If the TaskManager process exits unexpectedly, JobManager receives a message, applies for resources from Yarn ResourceManager again, and restarts The TaskManager

How does Flink interact with YARN?

  1. The Client uploads the JAR package and configuration file to the HDFS cluster

  2. The Client submits tasks to Yarn ResourceManager and applies for resources

  3. ResourceManager allocates Container resources and starts ApplicationMaster. Then AppMaster loads the Flink Jar package, configures the build environment, and starts JobManager

    1. JobManager and ApplicationMaster run on the same Container.

    2. Once they are successfully started, the AppMaster knows the address of the JobManager (AM its own machine).

    3. It generates a new Flink configuration file for TaskManager (they can then connect to JobManager). The configuration file is also uploaded to the HDFS.

    4. In addition, the AppMaster container also provides Flink’s Web services interface.

      All ports allocated by YARN are temporary ports, allowing users to execute multiple Flinks in parallel

  4. ApplicationMaster applies for working resources from ResourceManager,NodeManager loads the Flink Jar package, configudes the build environment, and starts TaskManager

  5. After starting, TaskManager sends heartbeat packets to JobManager and waits for JobManager to assign tasks to it

1.4.Flink Introduction case

1.4.1. Programming model

Flink application structure mainly includes three parts, the Source/Transformation/Sink, as shown in the figure below:

1.4.2. Getting Started cases

Pom file


      
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.lhw</groupId>
    <artifactId>flink_demo</artifactId>
    <version>1.0 the SNAPSHOT</version>


    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>apache</id>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>

    <properties>
        <encoding>UTF-8</encoding>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <java.version>1.8</java.version>
        <scala.version>2.12</scala.version>
        <flink.version>1.12.0</flink.version>
    </properties>
    <dependencies>
        <! -- Rely on Scala -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.12.11</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>Flink - clients_2. 12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>Flink - scala_2. 12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>Flink - streaming - scala_2. 12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>Flink - streaming - java_2. 12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>Flink - table - API - scala - bridge_2. 12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>Flink - table - API - Java - bridge_2. 12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <! -- blink execute plan,1.11+ default -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>Flink - table - planner - blink_2. 12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <! - < the dependency > < groupId > org. Apache. Flink < / groupId > < artifactId > flink - cep_2. 12 < / artifactId > <version>${flink.version}</version> </dependency>-->

        <! -- Flink connector -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>Flink - connector - kafka_2. 12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>Flink - SQL - connector - kafka_2. 12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>Flink - connector - jdbc_2. 12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <! - < the dependency > < groupId > org. Apache. Flink < / groupId > < artifactId > flink - parquet_2. 12 < / artifactId > <version>${flink.version}</version> </dependency>-->
        <! <dependency> <groupId>org.apache.avro</groupId> <artifactId> </artifactId> <version>1.9.2</version> </dependency> < the dependency > < groupId > org. Apache. Parquet < / groupId > < artifactId > parquet - avro < / artifactId > < version > 1.10.0 < / version > </dependency>-->


        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>Flink - connector - redis_2. 11</artifactId>
            <version>1.0</version>
            <exclusions>
                <exclusion>
                    <artifactId>Flink - streaming - java_2. 11</artifactId>
                    <groupId>org.apache.flink</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>Flink - runtime_2. 11</artifactId>
                    <groupId>org.apache.flink</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>flink-core</artifactId>
                    <groupId>org.apache.flink</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>flink-java</artifactId>
                    <groupId>org.apache.flink</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>Flink - connector - hive_2. 12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-metastore</artifactId>
            <version>2.1.0</version>
            <exclusions>
                <exclusion>
                    <artifactId>hadoop-hdfs</artifactId>
                    <groupId>org.apache.hadoop</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>2.1.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
            <version>2.7.5-10.0 -</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
            <! - < version > 8.0.20 < / version > -- >
        </dependency>

        <! -- High performance asynchronous component: Vertx-->
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-core</artifactId>
            <version>3.9.0</version>
        </dependency>
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-jdbc-client</artifactId>
            <version>3.9.0</version>
        </dependency>
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-redis-client</artifactId>
            <version>3.9.0</version>
        </dependency>

        <! - log - >
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.44</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.2</version>
            <scope>provided</scope>
        </dependency>

        <! - reference: https://blog.csdn.net/f641385712/article/details/84109098-- >
        <! --<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-collections4</artifactId> The < version > 4.4 < / version > < / dependency > -- >
        <! - < the dependency > < groupId > org. Apache. Thrift < / groupId > < artifactId > libfb303 < / artifactId > < version > 0.9.3 < / version > <type>pom</type> <scope>provided</scope> </dependency>-->
        <! - < the dependency > < groupId > com. Google. Guava < / groupId > < artifactId > guava < / artifactId > < version > 28.2 jre < / version > </dependency>-->

    </dependencies>

    <build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <plugins>
            <! -- Compile the plugin -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1 track of</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <! --<encoding>${project.build.sourceEncoding}</encoding>-->
                </configuration>
            </plugin>
            <! -- specify the plug-in that builds Scala -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>
            <! -- Package the plug-in (it will include all dependencies) -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>* : *</artifact>
                                    <excludes>
                                        <! -- zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <! -- Set the jar entry class (optional) -->
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Copy the code
package com.lhw.flink.demo1_wordcount;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/** * Requirement: Use Flink to complete WordCount-DataStream * encoding step: * 1: Prepare environment -env * 2: Prepare data -source * 3: Process data - Transformation * 4: Output result -sink * 5: Trigger execution. -execute */
public class DataStreamWordCount2 {
    public static void main(String[] args) throws Exception {
        // The new version of the stream batching API supports both streaming and batching
        //1: Prepare the environment -env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// Automatic judgment
        //env.setRuntimeMode(RuntimeExecutionMode.STREAMING); / / flow
        //env.setRuntimeMode(RuntimeExecutionMode.BATCH); / / batch
        //2: Prepare data. -source
        DataStream<String> linesDS = env.fromElements("itcast hadoop spark"."itcast hadoop spark"."itcast hadoop"."itcast");
        //3: transformation of data
        //3.1: Each line of data is divided into a set of words cut by Spaces
        DataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                String[] arr = s.split("");
                for(String s1 : arr) { collector.collect(s1); }}});//3.2: mark each word in the set as 1
        DataStream<Tuple2<String, Integer>> wordAndOnesDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                return Tuple2.of(s, 1); }});//3.3: Group data by key
        //0 indicates that the tuple is grouped according to the fields whose index is 0, that is, keys
// KeyedStream
      
       , Tuple> groupedDS = wordAndOnesDS.keyBy(0);
      
        KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0);
        //3.4: Aggregate the data in each group by value
        //1 indicates the field with index 1 in the tuple
        DataStream<Tuple2<String, Integer>> result = groupedDS.sum(1);
        //4: output result -sink
        result.print();
        //5: triggers the -execute
        env.execute();//DataStream needs to invoke execute}}Copy the code

1.5. Flink principle

1.5.1.Flink Role Division

In the actual production, Flink runs as a cluster, and two kinds of processes are included in the running process.

  • JobManager:

    It plays the role of cluster manager, scheduling tasks, coordinating the changing system, coordinating fault recovery, collecting Job status information, and managing the slave node TaskManager in the Flink cluster.

  • TaskManager:

    The Worker who is actually responsible for performing the calculation executes a group of tasks of Flink Job on it; TaskManager is also the administrator of the node. It is responsible for reporting the server information such as memory, disk, and task running status to JobManager.

  • Client:

    When submitting a written Flink project, the user will first create a Client and then submit it. This Client is called Client

1.5.2.Flink Execution process

Standalone

On Yarn

  1. The Client uploads the Flink Jar package and configuration to the HDFS
  2. The Client submits tasks to Yarn ResourceManager and applies for resources
  3. ResourceManager allocates Container resources and starts ApplicationMaster. Then AppMaster loads the Flink Jar package, configures the build environment, and starts JobManager
  4. ApplicationMaster applies for working resources from ResourceManager,NodeManager loads the Flink Jar package, configudes the build environment, and starts TaskManager
  5. After starting, TaskManager sends heartbeat packets to JobManager and waits for JobManager to assign tasks to it

1.5.3. Flink Streaming Dataflow

Dataflow, Operator, Partition, SubTask, and Parallelism

  1. Dataflow:Flink programs are mapped to a Dataflow model when executed
  2. Operator: Each operation in the data flow model is called Operator, which is divided into Source, Transform, and Sink
  3. Partition: The data flow model is distributed and parallel. Between 1 and N partitions are formed during execution
  4. Subtask: Multiple partitioned tasks can run in parallel, each running independently in a thread, a Subtask
  5. Parallelism: Degree of Parallelism, which is the number of subtasks/partitions that can actually be executed at the same time

1.5.4.Operator delivery mode

There are two modes when data is passed between two operators:

  1. One to One mode

    When the two operators are passed in this mode, the number of partitions and the sort of data are kept. As shown in the figure above, Source1 through Map1, it preserves the partitioning characteristics of the Source, as well as the ordering of partitioning element processing. — Similar to narrow dependencies in Spark

  2. Redistributing mode

    This mode changes the number of partitions of the data; Each operator subtask sends data to a different target subtask based on the transformation selected. For example, keyBy() is repartitioned via hashcode, and broadcast() and rebalance() are randomly repartitioned. — Similar to wide dependencies in Spark

1.5.5. Operator Chain

The client optimizes the Operator when submitting the task. The Operator that can be merged will be merged into one Operator. The merged Operator is called Operator chain, which is actually an execution chain. Each chain of execution is executed in a separate thread on TaskManager — a SubTask.

1.5.6. TaskSlot And Slot Sharing

  • Task slot

Each TaskManager is a JVM process. In order to control how many tasks a TaskManager(worker) can receive, Flink controls the task Slot. The number of taskSlots is used to limit the number of worker threads that can run simultaneously in a TaskManager worker process. A TaskSlot is the minimum unit of resource allocation in a TaskManager. The number of taskslots in a TaskManager means how many concurrent tasks can be supported.

Flink divides the memory of the process into multiple slots. The following benefits can be obtained after the memory is divided into different slots:

– TaskManager Specifies the maximum number of concurrent subtasks that can be executed using the number of tasksolts

– TaskSolt has an exclusive memory space, so that multiple different jobs can be run in a TaskManager without affecting each other.

  • Slot Sharing

Flink allows subtasks to share slots even if they are subtasks of different tasks (phases), as long as they are from the same job.

For example, map, keyBy and sink in the lower left corner of the figure are executed in a TaskSlot to achieve the purpose of resource sharing.

Allowing slot sharing has two main benefits:

– Resource allocation is fairer. If there are idle slots, more tasks can be allocated to them.

– Task slot sharing improves resource utilization.

Note:

Slot is a static concept that refers to the concurrent execution capability of TaskManager

Parallelism is a dynamic concept that refers to the concurrency capabilities that are actually used when a program is run

1.6.Flink runtime component

The Flink runtime architecture consists of four different components that work together when running stream processing applications:

  • JobManager: assigns tasks and schedules checkpoint to create snapshots
  • TaskManager: the main TaskManager
  • ResourceManager: manages and allocates resources
  • Dispatcher: web user interface (WebUI) that facilitates task submission

Because Flink is implemented in Java and Scala, all components run on the Java virtual machine. The responsibilities of each component are as follows

  1. JobManager
    • The main process that controls the execution of an application, that is, each application is controlled by a different JobManager.
    • The JobManager first receives the application to execute, which consists of the JobGraph, the Logical Dataflow Graph, and the JAR package that contains all the classes, libraries, and other resources.
    • The JobManager transforms the JobGraph into a physical-level data flow graph called an “ExecutionGraph” that includes all tasks that can be executed concurrently.
    • JobManager asks ResourceManager for the resources necessary to perform the task, that is, slots on the TaskManager. Once it gets enough resources, it distributes the execution diagrams to the TaskManager where they are actually running. During runtime, the JobManager takes care of everything that requires central coordination, such as checkpoints.
  2. TaskManager
    • Work process in Flink. There are usually multiple TaskManagers running in Flink, each containing a certain number of slots. The number of slots limits the number of tasks TaskManager can perform.
    • Once started, TaskManager registers its slots with the Resource manager; Upon receiving an instruction from the resource manager, TaskManager provides one or more slots for the JobManager to call. The JobManager can then assign tasks to the slot to execute.
    • During execution, a TaskManager can exchange data with other TaskManagers running the same application.
  3. ResourceManager (ResourceManager)
    • A slot that is primarily responsible for managing the TaskManager. A TaskManger slot is a processing resource unit defined in Flink.
    • Flink provides different resource managers for different environments and resource management tools, such as YARN, Mesos, K8s, and standalone deployments.
    • When JobManager applies for slot resources, ResourceManager allocates the TaskManager with free slots to JobManager. If ResourceManager does not have enough slots to meet JobManager’s requests, ResourceManager can initiate a session to the resource providing platform to provide a container for starting the TaskManager process.
  4. Dispatcher
    • Running across jobs, it provides a REST interface for application submission.
    • When an application is submitted for execution, the dispenser starts and hands the application over to a JobManager.
    • The Dispatcher also launches a Web UI to easily display and monitor job execution information.
    • The Dispatcher may not be required in the architecture, depending on how the application commits.

1.7.Flink ExecutionGraph

The data flow graphs directly mapped by the Flink program are ****StreamGraph****, also known as logical flow graphs because they represent a high-level view of the computational logic. To execute a flow handler, Flink converts a logical flow diagram into a physical data flow diagram (also called an execution diagram) that details how the program will be executed.

The execution diagram in Flink can be divided into four layers: StreamGraph -> JobGraph -> ExecutionGraph -> physical execution diagram.

The principle is introduced

  • The Flink executor automatically generates a DAG data flow diagram from the program code
  • The execution diagram in Flink can be divided into four layers: StreamGraph -> JobGraph -> ExecutionGraph -> physical execution diagram.
  • StreamGraph: Is the original graph generated from user code written through the Stream API. Represents the topology of the program.
  • JobGraph: The data structure that StreamGraph is optimized to generate JobGraph and submit to JobManager. The main optimization is to chain multiple qualified nodes together as a node, which can reduce the serialization/deserialization/transmission consumption required by data flowing between nodes.
  • ExecutionGraph: The JobManager generates an ExecutionGraph from the JobGraph. ExecutionGraph, a parallel version of JobGraph, is the core data structure of the scheduling layer.
  • Physical ExecutionGraph: JobManager schedules jobs according to the ExecutionGraph, and the “graph” formed by deploying tasks on each TaskManager is not a specific data structure.

Simple understanding:

StreamGraph: The initial program executes the logical flow, that is, the sequence of operators — generated on the Client

JobGraph: Merges the Operator of OneToOne into the OperatorChain– generated on the Client

ExecutionGraph: The JobGraph is planned in parallel according to the degree of parallelism set in the code and the requested resources! — generated on JobManager

Physical ExecutionGraph: implement the ExecutionGraph’s parallel plan to specific taskmanagers, and implement specific subtasks to specific taskslots for running.