Author: Sha Shengyang

preface

This paper is mainly aimed at students who have first contact with Flink or have some knowledge of Flink but have not actually operated it. We hope to help you use Flink more smoothly and start relevant development and debugging work.

Course contents include:

  • Flink development environment deployment and configuration
  • Run the Flink application (Standalone mode, Standalone mode, and Yarn cluster mode)

I. Flink development environment deployment and configuration

Flink is an open source big data project with Java and Scala as development languages. The code is open source on GitHub, and Maven is used to compile and build the project. For most of you who use Flink, Java, Maven, and Git are essential tools, and a powerful IDE helps you read code faster, develop new features, and fix bugs. Due to space constraints, we won’t go into the installation details of each tool, but we will give you the necessary installation recommendations.

For the development test environment, Mac OS, Linux, or Windows can be used. If you are running Windows 10, it is recommended that you use the Linux subsystem of Windows 10 to compile and run.

tool annotation
Java The Java version must be at least Java 8 and Java 8U51 or later is recommended
Maven Maven 3 must be used. Maven 3.2.5 is recommended. Maven 3.3.x compiled successfully, but there were some problems with Shade Dependencies
Git Flink’s code repository is:Github.com/apache/flin…

It is recommended to use a stable branch that has been published by the community, such as release1.6 or release1.7.

1. Compile Flink code

After we have configured the previous tools, compiling Flink is as simple as executing the following command:

mvn clean install -DskipTests
# or
mvn clean package -DskipTests 
Copy the code

Common compilation parameters:

-dfast ignores QA plugins and JavaDocs compilation. -dhadoop. version=2.6.1 Specifies hadoop version -- Settings =${maven_file_path}Explicitly specify the Maven settings.xml configuration fileCopy the code

After successful compilation, you can see the following files in the Flink -dist/target/ subdirectory of the current Flink code directory (different Flink code branches compile different versions, here is Flink 1.5.1) :

There are three files to note:

version annotation
Flink – 1.5.1. Tar. Gz Binary compressed package
Flink 1.5.1 – bin/flink – 1.5.1 Unzip the Flink binary directory
Flink – dist_2. 11-1.5.1. Jar Jar packages containing Flink’s core features

Note: Domestic users may encounter “Build Failure” during compilation (with MapR related errors reported), which is generally related to MapR dependent download Failure. Even with the recommended settings.xml configuration (where the Aliyun Maven source is specifically brokered for mapR-related dependencies), download failures are possible. The problem is mainly related to the large Jar package of MapR. When you encounter these problems, just try again. Before retrying, delete the corresponding directory in Maven Local Repository based on the failure information. Otherwise, you need to wait for Maven download timeout to start again.

2. Prepare the development environment

IntelliJ IDEA IDE is recommended as Flink’s IDE tool. The Eclipse IDE is not officially recommended, mainly due to incompatibility between Eclipse’s Scala IDE and Flink’s Scala.

If you need to do some Flink code development, you will need to configure Checkstyle according to the configuration file in the Tools /maven/ directory of Flink code, because Flink enforces a code style check at compile time. If the code style does not meet the specification, It may fail to compile directly.

Second, run the Flink application

1. Basic concepts

Running a Flink application is actually quite simple, but before running a Flink application, it is important to understand the various components of the Flink runtime, as this involves the configuration of the Flink application. As shown in Figure 1, this is a data handler written by a user using the DataStream API. As can be seen, operators that cannot be chained together in a DAG are separated into different tasks, that is, tasks are the smallest unit of resource scheduling in Flink.

Figure 1 the Parallel Dataflows

As shown in Figure 2, the actual Flink runtime includes two types of processes:

  • JobManager (also called JobMaster) : Coordinates distributed Task execution, including Task scheduling, Checkpoint creation, and Task recovery from Checkpoint in case of Job failover.
  • TaskManager (also called Worker) : Performs Tasks in Dataflow, including allocating memory buffers and transferring Data streams.

Figure 2 Flink Runtime architecture diagram

As Figure 3 shows, Task Slot is the minimum unit of resource allocation in a TaskManager, and the number of Task slots in a TaskManager indicates how many concurrent Task processing can be supported. Note that a Task Slot can execute multiple operators, and these operators can be processed together by the Chain.

Figure 3 Process

2. Prepare the operating environment

  • Flink Binary ○ Download Flink Binary directly from the Flink website or compile it from Flink source code
  • Install Java and configure the JAVA_HOME environment variable

3. Run Flink in a Standalone manner

(1) Basic startup process

The easiest way to run the Flink application is in a Standalone mode.

Starting a cluster:

./bin/start-cluster.sh
Copy the code

Open http://127.0.0.1:8081/ to see Flink’s Web interface. Try submitting the Word Count task:

./bin/flink run examples/streaming/WordCount.jar
Copy the code

You can explore the information displayed on the Web interface for yourself. For example, we can look at the TaskManager stdout log to see the result of the Word Count calculation.

We can also try specifying our own local file as input with the “–input” argument, and then execute:

./bin/flink run examples/streaming/WordCount.jar --input ${your_source_file}
Copy the code

Stopping a cluster:

./bin/stop-cluster.sh
Copy the code

(2) Common configurations

  • conf / slaves

Conf/Slaves Is used to configure the deployment of TaskManager. By default, only one TaskManager process is enabled. If you want to add a TaskManager process, add a line of localhost to the file.

You can also append a new taskManager directly with the command “./bin/ taskManager.sh start “:

./bin/taskmanager.sh start|start-foreground|stop|stop-all
Copy the code
  • conf/flink-conf.yaml

Conf /flink-conf.yaml Is used to set JM and TM running parameters. The common configurations are as follows:

# The heap size for the JobManager JVM
jobmanager.heap.mb: 1024

# The heap size for the TaskManager JVM
taskmanager.heap.mb: 1024

# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
taskmanager.numberOfTaskSlots: 4

# the managed memory size for each task manager.
taskmanager.managed.memory.size: 256
Copy the code

Once the Standalone cluster has started, we can try to analyze the flink-related processes in action. After executing the JPS command, you can see that there are two flink-related processes: JobManager and TaskManager. We can further use the ps command to look at the configuration of “-xmx” and “-xms” in the process startup parameters. We can then try to modify several configurations in flink-conf.yaml and restart the Standalone cluster to see what has changed.

It should be added that on the Blink open source branch, the memory calculation of TaskManager is more refined than that of the current community version. The general calculation method of the heap memory limit (-xMX) of TaskManager process is as follows:

TotalHeapMemory = taskmanager. Heap. MB + taskmanager. Managed. The memory. The size + taskmanager.. The process heap. Memory. MB (the default value is 128 MB)Copy the code

The default memory configuration for JobManager and TaskManager in Flink community Release 1.7 is:

# The heap size for the JobManager JVM
jobmanager.heap.size: 1024m

# The heap size for the TaskManager JVM
taskmanager.heap.size: 1024m
Copy the code

The “taskManager.heap. size” configuration in Flink Community release-1.7 actually refers not to the Java Heap memory limit, but to the total memory limit for taskManager processes. Taskmanager.heap. Size = taskManager.heap = taskManager.heap = taskManager.heap = taskManager.heap = taskManager.heap = taskManager.heap = taskManager.heap = taskManager.heap = taskManager.heap = taskManager.heap = taskManager.heap = taskManager.heap = taskManager.heap = taskManager.heap = taskManager.heap The reason for this is that the memory used by the Network buffer is excluded from the heap limit, because the memory used by the Network buffer must be Direct memory and therefore should not be counted in the heap limit.

#### (3) View and configure logs

The startup logs for JobManager and TaskManager can be found in the Log subdirectory in the Flink binary directory. Flink – is displayed in the Log directoryThe {id}-${hostname} “file corresponds to the output of JobManager, which has three files:

  • flink-{id}-${hostname}.log: Log output in code
  • flink-{id}-${hostname}. Out: Stdout output during process execution
  • flink-{id}-${hostname}-gc.log: Indicates the GC log of the JVM

Flink – is displayed in the Log directoryThe files prefixed with {id}-${hostname} correspond to the output of TaskManager and include three files, the same as the output of JobManager.

The log configuration file is in the conf subdirectory of the Flink binary directory, where:

  • Log4j-cli. properties: Log configuration for the Flink command line, such as the “Flink run” command
  • Log4j-yarn-session. properties: specifies the log configuration when yarn-session.sh is used to start yarn
  • Log4j. properties: In Standalone or Yarn mode, the log configuration used on JobManager and TaskManager is log4j.properties

Log4j. *properties = logback.* XML = logback.* properties = log4j.*properties = logback.

  • log4j-cli.properties -> logback-console.xml
  • log4j-yarn-session.properties -> logback-yarn.xml
  • log4j.properties -> logback.xml

Note that “flink-{id}-{user}-taskexecutor-{hostname} “with”{id} indicates the start sequence of this process in all processes of this role (JobManager or TaskManager) on the local machine. The value starts from 0 by default.

(4) Further exploration

Try repeating the “./bin/start-cluster.sh “command and then looking at the Web page (or executing the JPS command) to see what happens? Try looking at the startup script to see why. You can repeat the./bin/stop-cluster.sh command to see what happens after each execution.

4. Deploy the Flink Standalone cluster on multiple machines

Points to note before deployment:

  • The Java and JAVA_HOME environment variables are configured on each machine
  • Ensure that the Flink binary directory deployed on each machine is the same directory
  • If HDFS is used, you need to configure the HADOOP_CONF_DIR environment variable

Modify conf/masters and conf/ Slaves configurations based on your cluster information.

Modify the conf/flink-conf.yaml configuration and ensure that it is the same as the address in the Masters file:

jobmanager.rpc.address: z05f06378.sqa.zth.tbsite.net
Copy the code

Make sure that the configuration files in the conf in the Flink binary directory are the same on all machines, especially the following three:

conf/masters
conf/slaves
conf/flink-conf.yaml
Copy the code

Then start the Flink cluster:

./bin/start-cluster.sh
Copy the code

Submit a WordCount job:

./bin/flink run examples/streaming/WordCount.jar
Copy the code

Upload WordCount Input file:

hdfs dfs -copyFromLocal story /test_dir/input_dir/story
Copy the code

Submit the HDFS WordCount job:

./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
Copy the code

Increase the concurrency of the WordCount job (note that the output file with the same name will fail) :

./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output --parallelism 20
Copy the code

5. Standalone mode high Capability (HA) deployment and configuration

From the Flink Runtime architecture diagram in Figure 2, we can see that JobManager is the role most likely to cause system unavailability in the entire system. If a TaskManager fails, you only need to dispatch the Task to another idle TaskSlot when resources are sufficient. Then the Job recovers from Checkpoint. If only one JobManager is configured in the current cluster, once the JobManager is suspended, you must wait for the JobManager to recover. If the recovery takes a long time, the whole Job may fail.

Therefore, if the Standalone mode is used for production services, HighAvailability must be deployed and configured. In this way, multiple jobmanagers can be on standby at the same time, enabling the JobManager to continue to serve.

FIG. 4 Flink JobManager HA schematic diagram

Note:

  • If you want to use the Flink standalone HA mode, make sure it is based on Flink release-1.6.1 or above as there is a bug in the community that causes the main JobManager to not work properly in this mode.
  • The next experiment will use HDFS, so you’ll need to download the Flink Binary package with Hadoop support.

#### (1) (Optional) Deploy ZK using the script delivered with Flink

Flink currently supports ZooKeeper-based HA. If ZK is not deployed in your cluster, Flink provides a script to start the Zookeeper cluster. First modify the configuration file “conf/zoo. CFG”, according to you to deploy the Zookeeper Server configuration, and number of machine “Server. X = addressX: peerPort: leaderPort”, X is the unique ID of a Zookeeper Server and must be a number.

# The port at which the clients will connect
clientPort=3181

server.1=z05f06378.sqa.zth.tbsite.net:4888:5888
server.2=z05c19426.sqa.zth.tbsite.net:4888:5888
server.3=z05f10219.sqa.zth.tbsite.net:4888:5888
Copy the code

Then start Zookeeper:

./bin/start-zookeeper-quorum.sh
Copy the code

The JPS command sees that the ZK process has started:

Command for stopping the Zookeeper cluster:

./bin/stop-zookeeper-quorum.sh
Copy the code

#### (2) Modify the configuration of the Flink Standalone cluster

Change the conf/masters file and add a JobManager:

$cat conf/masters
z05f06378.sqa.zth.tbsite.net:8081
z05c19426.sqa.zth.tbsite.net:8081
Copy the code

The previously modified conf/ Slaves file remains unchanged:

$cat conf/slaves
z05f06378.sqa.zth.tbsite.net
z05c19426.sqa.zth.tbsite.net
z05f10219.sqa.zth.tbsite.net
Copy the code

Modify conf/flink-conf.yaml file:

# Configure high-availability mode
high-availability: zookeeper

Configure zookeeper quorum (hostname and port need to be based on the actual configuration of zK)
high-availability.zookeeper.quorum: z05f02321.sqa.zth.tbsite.net:2181,z05f10215.sqa.zth.tbsite.net:2181

(Optional) Set the root directory of ZooKeeper
high-availability.zookeeper.path.root: /test_dir/test_standalone2_root

# (optional) is equivalent to the namespace of the ZK Node created in the standalone cluster
high-availability.cluster-id: /test_dir/test_standalone2

# JobManager's meta information is stored in DFS, and zK mainly stores a pointer to the DFS path
high-availability.storageDir: hdfs:///test_dir/recovery2/
Copy the code

Note that both of these configurations in conf/flink-conf.yaml are invalid in HA mode (think about why).

jobmanager.rpc.address
jobmanager.rpc.port
Copy the code

After the modification is complete, ensure that the configuration is synchronized to other machines.

Start the Zookeeper cluster:

./bin/start-zookeeper-quorum.sh
Copy the code

Start the Standalone cluster again (make sure the previous Standalone cluster has stopped) :

./bin/start-cluster.sh
Copy the code

Open the JobManager Web page on the two Master nodes respectively:

z05f06378.sqa.zth.tbsite.net:8081 z05c19426.sqa.zth.tbsite.net:8081

You can see that both pages end up at the same address, which is the machine on which the active JobManager is located, and the other is the Standby JobManager. This completes the HA configuration in Standalone mode.

Next we can test to verify the effectiveness of HA. When we know the JobManager machine, we can Kill off the main JobManager process, such as the current main JobManager on z05c19426.sqa.zth.tbsite.net this machine, just to Kill the process.

Then, to open the two links: z05f06378.sqa.zth.tbsite.net: 8081 z05c19426.sqa.zth.tbsite.net: 8081

You can find that the latter link cannot be displayed, but the former link can be displayed, indicating that an active/standby switchover has occurred. Then we restart the previous main JobManager:

./bin/jobmanager.sh start z05c19426.sqa.zth.tbsite.net 8081
Copy the code

Open z05c19426.sqa.zth.tbsite.net: 8081 the link, will find that the link can now go to z05f06378.sqa.zth.tbsite.net: 8081 on this page. The JobManager completes Failover Recovery.

6. Run Flink jobs in Yarn mode

Figure 5 Flink Yarn deployment flowchart

Compared to Standalone mode, Yarn mode allows Flink jobs to have the following advantages:

  • Resources are used on demand to improve cluster resource utilization
  • Tasks have priorities and run jobs according to their priorities
  • ○ Both the JobManager process and the TaskManager process are monitored by the Yarn NodeManager. ○ If the JobManager process exits abnormally, ○ If the TaskManager process exits abnormally, the JobManager receives the message and applies for resources from Yarn ResourceManager again. Restart TaskManager

(1) Start the Long Running Flink Cluster on Yarn (Session Cluster mode)

View command parameters:

./bin/yarn-session.sh -h
Copy the code

Create a Flink cluster in Yarn mode:

./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m
Copy the code

The parameters used are:

  • -n,–container Number of TaskManagers
  • -jm,–jobManagerMemory Memory for JobManager Container with optional unit (default: MB)
  • -tm,–taskManagerMemory Memory per TaskManager Container with optional unit (default: MB)
  • -qu,–queue Specify YARN queue.
  • -s,–slots Number of slots per TaskManager
  • -t,–ship Ship files in the specified directory (t for transfer)

Submit a Flink job to the Flink cluster:

./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
Copy the code

This time, the Flink job can be submitted to the corresponding Flink cluster, although the corresponding Yarn Application information is not specified. The/TMP /. Yarn-properties -${user} file stores the cluster information of the last YARN session creation. So if the same user creates another Yarn Session on the same machine, the file will be overwritten.

  • If you delete/TMP /.yarn-properties-${user} or submit the job on another machine, can the job be submitted to the expected YARN session? You can configure the high-availabily. cluster-id parameter to obtain the JobManager address and port number from Zookeeper and submit jobs.
  • How can I submit a Yarn Session if HA is not configured?

In this case, you must specify the Application ID on Yarn in the command to submit Flink job and pass the -yid parameter:

/bin/flink run -yid application_1548056325049_0048 examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
Copy the code

We can see that the TaskManager is released shortly after each run and pulled up again the next time the task is submitted. If you want to extend the timeout of the idle TaskManager, you can configure the following parameter in conf/flink-conf.yaml.

slotmanager.taskmanager-timeout: 30000L         # deprecated, used in release-1.5
resourcemanager.taskmanager-timeout: 30000L
Copy the code

(2) Run a Single Flink job on Yarn (Job Cluster mode)

If you just want to run a single Flink Job and then quit, you can use the following command:

./bin/flink run -m yarn-cluster -yn 2 examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output  hdfs:///test_dir/output_dir/outputCopy the code

Common configurations are as follows:

  • -yn,–yarncontainer Number of Task Managers
  • -yqu,–yarnqueue Specify YARN queue.
  • -ys,–yarnslots Number of slots per TaskManager
  • -yqu,–yarnqueue Specify YARN queue.

You can view the available parameters of Run using the Help command:

./bin/flink run -h
Copy the code

/bin/flink run -h Indicates that the parameters prefixed with -y and –yarn in Options for yarn-cluster mode correspond to the./bin/yarn-session.sh -h command. Semantically, it’s pretty much the same.

About the relationship between -n (in YARN Session mode), -yn (in yarn Single Job mode) and -p:

  • “-n” and “-yn” have no actual control in the community version (release-1.5 ~ release-1.7), the actual resources are requested according to the “-p” parameter and will be returned to TM when it is finished
  • In the open source version of Blink, the function of “-n” (in Yarn Session mode) is to start a specified number of TaskManagers at first, and then not apply for new Taskmanagers even if the Job needs more slots
  • In the open source version of Blink, the Yarn single job mode “-yn” indicates the initial number of TaskManagers without setting the upper limit of taskManagers. (Note that the Single job mode can be set to Single job mode only when the -yd parameter is added. For example, run the./bin/flink run – yd-m yarn-cluster XXX command.)

7. Configure HighAvailability in Yarn mode

Ensure that the configuration in the yarn-site. XML file used to start the Yarn cluster is the upper limit for AM restart at the Yarn cluster level.

     <property>
      <name>yarn.resourcemanager.am.max-attempts</name>
      <value>100</value>
    </property>
Copy the code

Then in the conf/flink-conf.yaml file, set the number of times that the JobManager of the flink job can restart.

yarn.application-attempts: 10     # 1+ 9 retries
Copy the code

Finally, configure the ZK configuration in the conf/flink-conf.yaml file. The configuration methods are similar to those of the Standalone HA configuration, as shown in the following figure.

# Configure high-availability mode
high-availability: zookeeper

Configure zookeeper quorum (hostname and port need to be based on the actual configuration of zK)
high-availability.zookeeper.quorum: z05f02321.sqa.zth.tbsite.net:2181,z05f10215.sqa.zth.tbsite.net:2181

(Optional) Set the root directory of ZooKeeper
high-availability.zookeeper.path.root: /test_dir/test_standalone2_root

Delete this configuration
# high-availability.cluster-id: /test_dir/test_standalone2

# JobManager's meta information is stored in DFS, and zK mainly stores a pointer to the DFS path
high-availability.storageDir: hdfs:///test_dir/recovery2/
Copy the code

Special attention should be paid to: In Yarn (and Mesos) mode, if the cluster-ID is not configured, it can be configured as the Application ID on Yarn to ensure uniqueness.