1. Deployment mode of Flink

1.1 Local Mode

1.1.1 to introduce
1) Linux, Windows, or MAC OS 2) Application scenario: Verify the program in local mode during developmentCopy the code
1.1.2 Effect demonstration
1.1.2.1 configuration flink_home

[robin@node01 module]$ sudo vi /etc/profile

export FLINK_HOME=/opt/module/flink
export PATH=$PATH:$FLINK_HOME/bin
Copy the code

[robin@node01 module]$ source /etc/profile

1.1.2.2 Start the Flink Cluster of Local

1.1.2.3 Viewing task Manager

1.1.2.4 Stopping a local Cluster

1.2 standalone mode

1.2.1 Non-HA Mode (JobManager HA Is Not Configured)
1.2.1.1 Configuration Mode
Step: 1) Perform the following configuration on node01:1. Edit the configuration file masters in the conf directory. 2. Yaml 2) Distribute the flink configured on Node01 to other nodes (node02,node03) 3) Run the start-cluster.sh script to start the Flink distributed cluster. 4) Click Task ManagerCopy the code

Step 1: Perform the following configuration on Node01

conf/masters

conf/slaves

conf/flink-conf.yaml

[robin@node01 ~]$ cat /opt/module/flink/conf/flink-conf.yaml

Step 2: Distribute flink configured on Node01 to other nodes

Synchronization script

[robin@node01 ~]$ cat bin/xsync #! Pcount =$# if((pcount==0)); then echo no args; exit; Fname = 'basename $p1' echo fname= 'fname' pdir= 'CD -p $(dir $p1); PWD 'echo pdir=$pdir #4 Get the current user name user=' whoami '#5 loop NODE_IPS=("node02" "node03") for node_ip in ${NODE_IPS[@]} do #echo  $pdir/$fname $user@hadoop$host:$pdir echo --------------- ${node_ip} ---------------- rsync -rvl $pdir/$fname $user@${node_ip}:$pdir doneCopy the code

[robin@node01 module]$ xsync flink

Step 3: Run the start-cluster.sh script to start the Flink distributed cluster

Step 4: Confirm through the task Manager interface

1.2.1.2 Testing the availability of standalone non-HA mode
Test the usability steps: 1. Package the project as a JAR package 2. Upload it to Linux 3. Submit, go to 4. Verify the resultCopy the code

Implement step 1: Package the project as a JAR packageStep 2: Upload to Linux

[robin@node01 ~]$ mkdir flink-learn Step 3: Commit and execute

The submission modes are as follows: 1. On the CLI 2Copy the code

Mode 1: Cli mode

[robin@node01 flink-learn]$ vim standalone-deploy.sh

Flink run - c com. Jd. Bounded. Hdfs3. BoundedFlowTest4 \. / flink - learning - 1.0 - the SNAPSHOT. Jar \ - input hdfs://node01:9000/flink/input \ --output hdfs://node01:9000/flink/output \Copy the code

Note: Run errorThe solutionUpload the downloaded JAR package to the fllink/lib directorySync to other machinesNote: Stop the Flink distributed cluster and synchronize jar packages before restarting the Flink cluster

The effect of running the script Fixed the effect of running after

Method 2: Use the task manager

uploadDestination view

1.2.2 HA Mode (JobManager HA Configuration)
1.2.2.1 introduction
Procedure: 1. Cluster planning: node01 --> JobManager (master role) node02 --> JobManager (slave role), TaskManager node03 --> TaskManager 2. Implementation Roadmap: a. Modify the configuration file flink-conf.yaml,masters,slaves B. Synchronize flink on node01 to other nodes (node02,node03) c Start the Flink distributed cluster d. Verify that the Flink standalone HA mode is effectiveCopy the code
1.2.2.2 implementation

Step 1: Modify the configuration file flink-conf.yaml,masters, Slaves

[robin@node01 conf]$ vim masters

node01:8081
node02:8081
Copy the code

[robin@node01 conf]$ vim flink-conf.yaml

jobmanager.rpc.address: node01 jobmanager.rpc.port: 6123 high-availability: Zookeeper # Set high availability mode to zK high-availability.storagedir: HDFS: / / node01: # 9000 / flink/ha/high availability storage directory high - the availability. Zookeeper. Quorum: node01:2181,node02:2181,node03:2181 high-availability.zookeeper.path.root: /flink high-availability.cluster-id: / cluster_one # specify an ID for each cluster, used to store the data of high - the availability of the clusters. The zookeeper. Client. Acl: openCopy the code

Slaves need not be modified as before

Step 2: Synchronize flink on Node01 to another node

Step 3: Start the Flink distributed cluster

[robin@node01 module]$ start-cluster.sh
Starting HA cluster with 2 masters.
Starting standalonesession daemon on host node01.
Starting standalonesession daemon on host node02.
Starting taskexecutor daemon on host node02.
Starting taskexecutor daemon on host node03.
[robin@node01 module]$ xcall.sh jps
--- node01 ---
1825 DataNode
1639 NameNode
2729 QuorumPeerMain
3579 Jps
3500 StandaloneSessionClusterEntrypoint
2127 NodeManager
--- node02 ---
3651 Jps
1670 ResourceManager
1463 DataNode
3546 TaskManagerRunner
2476 QuorumPeerMain
1821 NodeManager
3006 StandaloneSessionClusterEntrypoint
--- node03 ---
1701 NodeManager
2773 TaskManagerRunner
2837 Jps
1430 DataNode
1580 SecondaryNameNode
2222 QuorumPeerMain
Copy the code

Step 4: Verify that the Flink standalone HA mode is in effect

1. View the corresponding zNode information of ZooKeeper

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] 
[zk: localhost:2181(CONNECTED) 0] ls /
[flink, cluster, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, services, latest_producer_id_block, config]
[zk: localhost:2181(CONNECTED) 1] ls /flink/cluster_one
[jobgraphs, leader, leaderlatch]
[zk: localhost:2181(CONNECTED) 2] 
Copy the code

2. Verify the HA of the JobManager process

2.1 Method 1: View log files

2.1.1 Viewing the JobManager process playing the Leader role on Node01 2.1.2 Viewing the JobManager process in standBy state on Node02

2.2 Mode 2: Switching process status

2.2.1 introduces

Procedure 1. Manually kill JobManager on Node01. 2. Check whether Node02 is automatically elected as the Leader and the state is Active. 3. Restart JobManager on Node01 and check whether JobManager is in StandBy stateCopy the code

2.2.2 implementation

1. Manually kill JobManager on Node01

[robin@node01 module]$ jps 1825 DataNode 3826 Jps 1639 NameNode 2729 QuorumPeerMain 3674 ZooKeeperMain 3500 StandaloneSessionClusterEntrypoint 2127 NodeManager [robin@node01 module]$ jobmanager.sh stop Stopping standalonesession  daemon (pid: 3500) on host node01. [robin@node01 module]$ jps 1825 DataNode 1639 NameNode 2729 QuorumPeerMain 3674 ZooKeeperMain 2127  NodeManager 4239 JpsCopy the code
  1. To check whether Node02 is automatically elected as the Leader and its state is Active, run the following command

3. Restart JobManager on Node01 and check whether it is in StandBy state

[robin@node01 module]$ jps 1825 DataNode 1639 NameNode 2729 QuorumPeerMain 3674 ZooKeeperMain 4254 Jps 2127 NodeManager [robin@node01 module]$ jobmanager.sh start Starting standalonesession daemon on host node01. [robin@node01 module]$ jps 1825 DataNode 4739 Jps 1639 NameNode 4680 StandaloneSessionClusterEntrypoint 2729 QuorumPeerMain 3674 ZooKeeperMain 2127  NodeManager [robin@node01 module]$Copy the code

The word “leadership” cannot be searched

1.3 Yarn pattern

1.3.1 is introduced
Two deployment modes are available: Mode 1: Start the long running Flink cluster on Yarn mode 2: Run a single Flink job on Yarn CliFrontend is the entry class of all jobs. It reads flink's environment and configuration information through parsing passed parameters (jar package, mainClass, etc.), encapsulates it into PackageProgram, and finally submits it to flink cluster through clusterClientCopy the code
1.3.2 Demonstrate separately
1.3.2.1 yarn session

The characteristics of

Initialize a Flink cluster in Yarn, allocate specified resources, and submit future tasks to this cluster. The Flink cluster will reside in the Yarn cluster unless manually stopped. YarnSessionClusterEntrypoint is the Flink ApplicationMaster on Yarn, is also a JobManager, AM start class is YarnJobClusterEntrypoint, namely: YarnSessionClusterEntrypoint = ApplicationMaster + JobManager YarnJobClusterEntrypoint = ApplicationMaster process to start the class The YarnTaskExecutorRunner receives the subTask and runs the TaskManagerCopy the code

In field

Step 1: Start the Flink cluster and make it resident in YARN

Yarn -session. Sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d -n(--container) : The number of TaskManagers. -s(--slots): -jm: JobManager memory (unit OF MB) -tm: JobManager memory (unit of MB) -jm: JobManager memory (unit of MB) -tm: JobManager memory (unit of MB) Memory of each TaskManager (unit: MB) -nm: specifies the appName of YARN (the current name on the YARN UI) -d: specifies the background executionCopy the code

Effect:

[robin@node01 hadoop-2.7.2]$ yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test
2020-03-21 13:30:28,082 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, node01
2020-03-21 13:30:28,084 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2020-03-21 13:30:28,084 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
2020-03-21 13:30:28,084 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
2020-03-21 13:30:28,084 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 2
2020-03-21 13:30:28,084 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
2020-03-21 13:30:28,085 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability, zookeeper
2020-03-21 13:30:28,085 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.storageDir, hdfs://node01:9000/flink/ha/
2020-03-21 13:30:28,085 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.zookeeper.quorum, node01:2181,node02:2181,node03:2181
2020-03-21 13:30:28,085 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.zookeeper.path.root, /flink
2020-03-21 13:30:28,085 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.cluster-id, /cluster_one
2020-03-21 13:30:28,086 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.zookeeper.client.acl, open
2020-03-21 13:30:28,086 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.execution.failover-strategy, region
2020-03-21 13:30:28,574 WARN  org.apache.hadoop.util.NativeCodeLoader                       - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2020-03-21 13:30:28,707 INFO  org.apache.flink.runtime.security.modules.HadoopModule        - Hadoop user set to robin (auth:SIMPLE)
2020-03-21 13:30:28,770 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at node02/192.168.56.103:8032
2020-03-21 13:30:28,881 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - The argument n is deprecated in will be ignored.
2020-03-21 13:30:29,017 WARN  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.
2020-03-21 13:30:29,063 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=2, slotsPerTaskManager=2}
2020-03-21 13:30:29,516 WARN  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - The configuration directory ('/opt/module/flink/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
2020-03-21 13:30:32,867 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Submitting application master application_1584768585865_0001
2020-03-21 13:30:33,027 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted application application_1584768585865_0001
2020-03-21 13:30:33,027 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Waiting for the cluster to be allocated
2020-03-21 13:30:33,031 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Deploying cluster, current state ACCEPTED
2020-03-21 13:30:39,926 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - YARN application has been deployed successfully.
2020-03-21 13:30:39,961 INFO  org.apache.flink.runtime.blob.FileSystemBlobStore             - Creating highly available BLOB storage directory at hdfs://node01:9000/flink/ha///cluster_one/blob
2020-03-21 13:30:40,025 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl  - Starting
2020-03-21 13:30:40,031 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT
2020-03-21 13:30:40,031 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:host.name=node01
2020-03-21 13:30:40,031 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:java.version=1.8.0_144
2020-03-21 13:30:40,031 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:java.vendor=Oracle Corporation
2020-03-21 13:30:40,032 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:java.home=/opt/module/jdk1.8.0_144/jre
2020-03-21 13:30:40,032 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:java.class.path=/opt/module/flink/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:/opt/module/flink/lib/flink-table_2.11-1.9.1.jar:/opt/module/flink/lib/flink-table-blink_2.11-1.9.1.jar:/opt/module/flink/lib/log4j-1.2.17.jar:/opt/module/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/module/flink/lib/flink-dist_2.11-1.9.1.jar::/opt/module/hadoop-2.7.2/etc/hadoop:
2020-03-21 13:30:40,032 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
2020-03-21 13:30:40,032 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:java.io.tmpdir=/tmp
2020-03-21 13:30:40,032 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:java.compiler=<NA>
2020-03-21 13:30:40,032 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:os.name=Linux
2020-03-21 13:30:40,032 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:os.arch=amd64
2020-03-21 13:30:40,032 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:os.version=2.6.32-642.el6.x86_64
2020-03-21 13:30:40,032 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:user.name=robin
2020-03-21 13:30:40,032 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:user.home=/home/robin
2020-03-21 13:30:40,032 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:user.dir=/opt/module/hadoop-2.7.2
2020-03-21 13:30:40,033 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Initiating client connection, connectString=node01:2181,node02:2181,node03:2181 sessionTimeout=60000 watcher=org.apache.flink.shaded.curator.org.apache.curator.ConnectionState@f9b7332
2020-03-21 13:30:40,047 WARN  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/tmp/jaas-9089771528471195129.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it.
2020-03-21 13:30:40,049 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening socket connection to server node03/192.168.56.104:2181
2020-03-21 13:30:40,050 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket connection established to node03/192.168.56.104:2181, initiating session
2020-03-21 13:30:40,057 ERROR org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - Authentication failed
2020-03-21 13:30:40,064 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session establishment complete on server node03/192.168.56.104:2181, sessionid = 0x370fb91feeb0001, negotiated timeout = 40000
2020-03-21 13:30:40,067 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: CONNECTED
2020-03-21 13:30:40,413 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest client endpoint started.
Flink JobManager is now running on node02:60724 with leader id 30e10b8a-f6c0-4500-99fe-5bc0b54c9080.
JobManager Web Interface: http://node02:43947
Copy the code

Check the process

Step 2: Deploy flink job to YARN, run it, and verify the result

Deploy and start running

Flink run -c com.jd. Bounded hdfs_hdfs.boundedFlowTest \ -d./ flink-learning-1.0-snapshot.jar \ --input hdfs://node01:9000/flink/input \ --output hdfs://node01:9000/flink/output3/result.txtCopy the code

After starting, Flink jobs share the space created by yarn Session and run in the Flink distributed cluster

[robin@node01 flink-learn]$./yarn-cluster-deploy.sh 2020-03-21 15:22:11,787 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /tmp/.yarn-properties-robin. 2020-03-21 15:22:11, 787 INFO org. Apache. Flink. Yarn. The cli. FlinkYarnSessionCli - Found yarn properties file under / TMP /. Yarn - the properties - robin. The 2020-03-21 15:22:12, 166 INFO org. Apache. Flink. Yarn. The cli. FlinkYarnSessionCli - yarn The properties set default parallelism to 4 2020-03-21 15:22:12, 166 INFO org. Apache. Flink. Yarn. The cli. FlinkYarnSessionCli - YARN Properties set Default Parallelism to 4 YARN Properties set default Parallelism to 4 2020-03-21 15:22:125,205 INFO Org, apache hadoop. Yarn. Client. RMProxy - Connecting to the ResourceManager at node02/192.168.56.103:8032 2020-03-21 15:22:12, 342 INFO org. Apache. Flink. Yarn. The cli. FlinkYarnSessionCli - No path for the flink jar passed. Using the location Of class org. Apache. Flink. Yarn. YarnClusterDescriptor to locate the jar 15:22:12 2020-03-21, 342 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class Org. Apache. Flink. Yarn. YarnClusterDescriptor to locate the jar 15:22:12 2020-03-21, 345 a WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for Accessing YARN. 2020-03-21 15:22:12, 412 INFO org. Apache. Flink. YARN. AbstractYarnClusterDescriptor - Found application JobManager host name 'node02' and port '45906' from supplied application id 'application_1584775089663_0001' Starting execution of program Job has been submitted with JobID 2ac1cf69095c62373bea23f9ce6561c7Copy the code

Result of operation

1.3.2.2 Independent Flink job
  • The characteristics of
Each submission creates a new Flink cluster, and tasks are independent of each other for easy management. The cluster created after the task is complete disappears. YarnJobClusterEntrypoint = ApplicationMaster process startup class YarnTaskExecutorRunner is responsible for receiving subtasks and running as TaskManagerCopy the code
  • instructions
Run a single Flink job startup command on YARN: Flink run -m yarn-cluster-yn 2 -c com.jd. Bounded hdfs_hdfs.boundedFlowTest \ -d./flink-learning- 1.0-snapshot.jar \ - input HDFS: / / node01:9000 / flink/input \ - output HDFS: / / node01:9000 / flink/output4 / result. TXT commonly used configuration are: -yn, --yarncontainer Number of Task Managers -yqu, --yarnqueue Specify YARN queue -ys, Yarnslots Number of slots per TaskManager Start the Hadoop distributed cluster. 2. Run the shell script 3. Observe the process and verify the results on HDFSCopy the code
  • In field

Write shell scripts to submit applications

Flink run -m yarn-cluster-yn 2 -c com.jd. Bounded hdfs_hdfs.boundedFlowTest \ -d./flink-learning- 1.0-snapshot.jar \ --input hdfs://node01:9000/flink/input \ --output hdfs://node01:9000/flink/output4/result.txtCopy the code

Execute the shell script on the client

[robin@node01 flink-learn]$./yarn-cluster-per-job-deploy.sh 2020-03-21 16:13:54,299 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /tmp/.yarn-properties-robin. 2020-03-21 16:13:54, 299 INFO org. Apache. Flink. Yarn. The cli. FlinkYarnSessionCli - Found yarn properties file under / TMP /. Yarn - the properties - robin. The 2020-03-21 16:13:54, 675 INFO org.. Apache hadoop. Yarn. Client. RMProxy - Connecting to The ResourceManager at node02/192.168.56.103:8032 2020-03-21 16:13:54, 800 INFO org. Apache. Flink. Yarn. The cli. FlinkYarnSessionCli  - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate The jar 16:13:54 2020-03-21, 800 INFO org. Apache. Flink. Yarn. The cli. FlinkYarnSessionCli - No path for the flink jar passed. Using the location of the class org. Apache. Flink. Yarn. YarnClusterDescriptor to locate the jar 16:13:57 2020-03-21, 291 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - The argument yn is deprecated in will be ignored. 2020-03-21 16:13:57, 291 INFO org. Apache. Flink. Yarn. The cli. FlinkYarnSessionCli - The argument yn is deprecated in will be ignored. The 2020-03-21 16:13:57, 382 WARN org. Apache. Flink. Yarn. AbstractYarnClusterDescriptor - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN. 2020-03-21 16:13:57,429 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=2, SlotsPerTaskManager = 2} 2020-03-21 16:13:57, 435 WARN org. Apache. Flink. Yarn. AbstractYarnClusterDescriptor - The configuration directory ('/opt/module/flink/conf') contains both LOG4J and Logback configuration files. Please delete or Rename one of them. The 2020-03-21 16:13:59, 326 INFO org. Apache. Flink. Yarn. AbstractYarnClusterDescriptor - date Application Master Application_1584778326281_0001 2020-03-21 16:13:59,584 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1584778326281_0001 2020-03-21 16:13:59, 586 INFO org. Apache. Flink. Yarn. AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated The 2020-03-21 16:13:59, 590 INFO org. Apache. Flink. Yarn. AbstractYarnClusterDescriptor - Deploying cluster, The current state, ACCEPTED the 2020-03-21 16:14:06 435 INFO org. Apache. Flink. Yarn. AbstractYarnClusterDescriptor - yarn Deployed successfully. 2020-03-21 16:14:06,435 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - The Flink YARN client has been started in detached mode. In order to stop Flink on YARN, use the following command or a YARN web interface to stop it: yarn application -kill application_1584778326281_0001 Please also note that the temporary files of the YARN session in the home directory will not be removed. Job has been submitted with JobID e0d7f27ef42d7472466b14b13099dc93Copy the code
  • Verify the results

View the background management page of YARN

Check the result on HDFS

Check the result in HDFS