preface

In MapReduce, many people may have this question: after MR is written, how do map tasks and Reduce tasks execute in parallel on multiple nodes, and how do you decide which task is executed on which node? In fact, these problems are related to Yarn. Because the Yarn framework actually supports more than just MR, it can also run a variety of programs.

We know that running an application in a cluster consumes resources, such as CPU, memory, IO, etc. In the case of Hadoop1.x, this resource allocation behavior (cluster Resource Management in the figure above) is done by MapReduce itself. In a large cluster with multiple applications, MapReduce tends to become a system bottleneck. Therefore, in hadoop2.x, Yarn is introduced, while HDFS for storage and MapReduce for calculation remain unchanged.

Apache Hadoop YARN is a subproject of Apache Software Foundation Hadoop. It is introduced to separate Hadoop2.0 resource management and computing components. YARN is born because data stored in HDFS needs more interaction modes than the MapReduce mode. The YARN architecture of Hadoop2.0 provides more processing frameworks instead of forcing the MapReduce framework.

When enterprise data is available in HDFS, it is important to have multiple data processing methods. With Hadoop2.0 and YARN, organizations can use streaming, interactive data processing, and other Hadoop-based applications.

Yarn

I. Yarn architecture

YARN is a classic master/slave structure, as shown in the following figure. Generally speaking, the YARN service consists of one ResourceManager (RM) and multiple NodeManagers (NM). ResourceManager is the master node, and NodeManager is the slave node.

You can use the JPS command to open our cluster process and see a ResourceManager process as the primary node. See NodeManager, that is the slave node

1.1 Core Components

Component name role
ResourceManager Master is an independent process that manages, schedules, and allocates resources in a cluster.
ApplicationManager It is the guardian and manager of the Application. It monitors and manages all attempts of the Application on each cluster node and applies for and returns resources to Yarn ResourceManager.
NodeManager It is an independent process running on the Slave. It reports node status (disk, memory, AND CPU usage).
Container It is a unit of YARN that allocates resources, including memory and CPU resources. Yarn allocates resources in the unit of Container.

ResourceManager centrally manages and schedules resources on each NadeManager. When a user submits an application, you need to provide an ApplicationMaster for tracing and managing the application. It applies for resources from ResourceManager and asks NodeManger to start tasks that occupy resources. Because different ApplicationMasters are distributed on different nodes, they do not affect each other.

Each application submitted by the Client to ResourceManager must have an ApplicationMaster. After ResourceManager allocates resources, the ApplicationMaster runs in a Container on a Slave node. Tasks that do specific things also run in a Container associated with a Slave node.

1.2 How does Yarn Work

The basic concept of YARN is to split JobTracker/TaskTracker into the following entities:

1. A global ResourceManager ResourceManager 2. One ApplicationMaster per application 3. One NodeManager per slave node 4. Each application has one Container running on NodeManagerCopy the code

This is the general graph of Yarn about applications running on Yarn. To see this graph, you need to look at roles first. There are two clients, one active and three slave. The client needs to establish a connection with the master Resource Manager (see HDFS NameNode, Kafka controller and Kafka Leader partition). It is true that all big data frameworks are the same 🤣), and then ResourceManager contacts a nodeManager through calculation. At this time, the message sent by ResourceManager will contain the resource requirements for running app Master (such as how many CPUS, memory…). After nodeManager receives the message, You create a local container called Container, which runs the app Master process.

After the APP Master is started, it communicates with ResourceManager. ResourceManager calculates and returns a message to the App Master. Then, the App Master contacts other NodeManagers. This is because an application is run through multiple Containers. Create a Container to execute the application. These containers communicate with the App Master during task execution. When all tasks are completed, the App Master destroys these containers.

Ii. Description of core components

From now on will be very much very official text, really is more boring search engine results pile up

2.1 ResourceManager

RM is a global resource manager. There is only one resource manager in a cluster. RM manages and allocates resources for the entire system, including processing client requests, starting and monitoring ApplicationMaster, monitoring NodeManager, and resource allocation and scheduling. It mainly consists of two components: the Scheduler and the Applications Manager (ASM).

The scheduler

The scheduler allocates resources in the system to running applications based on capacity and queue constraints, such as the allocation of resources to each queue and the execution of a maximum number of jobs. It is important to note that this scheduler is a “pure scheduler”, it is responsible for any specific application-related work, such as not responsible for monitoring or tracking application execution status, etc., nor responsible for restarting failed tasks caused by application execution failure or hardware failure. This is done by the application-specific ApplicationMaster.

The scheduler allocates resources based only on the Resource requirements of each application. The Resource allocation unit is represented by an abstract concept called Resource Container (Container). Container is a dynamic Resource allocation unit. It encapsulates resources such as memory, CPU, disk, and network to limit the amount of resources used by each task.

Application manager

The application manager is responsible for managing all applications in the system, receiving job submissions, assigning the application the first Container to run ApplicationMaster, This includes application submission, negotiating resources with the scheduler to start ApplicationMaster, monitoring ApplicationMaster’s health, and restarting it if it fails.

2.2 ApplicationMaster

Manages each instance of an application running in YARN. Job or application management is handled by the ApplicationMaster process, and Yarn allows us to develop ApplicationMaster for our own applications.

Function:

  • Data segmentation;
  • Allocates resources to the application and further allocates them to internal tasks.
  • Task monitoring and fault tolerance;
  • Coordinates resources from ResourceManager and monitors easy execution and resource usage through NodeManager.

The communication between ApplicationMaster and ResourceManager is the core part of the Yarn application from submission to operation. It is the fundamental step for Yarn to dynamically manage resources in the cluster. The ApplicationMaster from multiple applications dynamically communicates with ResourceManager to continuously apply for, release, re-apply for, and release resources.

2.3 NodeManager

NodeManager is a slave service. It receives resource allocation requests from ResourceManager and allocates specific Containers to applications. In addition, it monitors and reports Container usage information to ResourceManager. Working with ResourceManager, NodeManager allocates resources in the Hadoop cluster.

Run the following command: NodeManager Displays resource usage on a node and the running status of containers (such as CPUS and memory)

  • Receives and processes command requests from ResourceManager, and allocates a Container to an application task.
  • RM collects reports from each NodeManager to track the health status of the entire cluster. NodeManager monitors its own health status.
  • Handle requests from ApplicationMaster;
  • Manages the life cycle of each Container on the node.
  • Manage logs on each node;
  • Perform some additional services applied on Yarn, such as the Shuffle process of MapReduce.

When a node is started, it registers with ResourceManager and tells ResourceManager how many resources it has available. During the running, NodeManager and ResourceManager work together to constantly update the information and ensure the optimal status of the cluster.

NodeManager only manages its own Container and does not know about the applications running on it. The component responsible for managing application information is ApplicationMaster

2.4 the Container

Container is a resource abstraction in YARN. It encapsulates multi-dimensional resources on a node, such as memory, CPU, disk, and network resources. When AM applies for resources from RM, the resources returned by RM are represented by Container. YARN assigns a Container to each task, and the task can use only the resources described in the Container.

The relationship between Containers and cluster nodes is as follows: A node can run multiple Containers, but a Container does not span nodes. Any job or application must run in one or more Containers. In the Yarn framework, ResourceManager only tells ApplicationMaster which Containers are available. ApplicationMaster also needs to go to NodeManager to request a specific Container.

Note that a Container is a dynamic resource division unit that is dynamically generated based on application requirements. YARN supports only CPU and memory resources, and uses the lightweight resource isolation mechanism Cgroups to isolate resources.

Function:

  • Abstraction of the Task environment;
  • Describe a set of information;
  • The set of task running resources (CPU, memory, IO, etc.);
  • Task operating environment

2.5 Resource Request and Container

Refer to connection

Yarn is designed to allow our applications to use the entire cluster in a shared, secure, multi-tenant manner. Yarn must be aware of the entire cluster topology to ensure efficient cluster resource scheduling and data access.

To achieve these goals, the ResourceManager Scheduler defines some flexible protocols for application Resource requests to better schedule applications in the cluster. Therefore, Resource Request and Container are born.

An application sends a resource request to ApplicationMaster that meets its own requirements. ApplicationMaster then sends the resource request to the ResourceManager Scheduler as a resource-request. The Scheduler returns the allocated resource description Container in the original Resource-request.

Each ResourceRequest can be considered as a serializable Java object and contains the following fields:

<! -resource-name: indicates the name of the resource. At present, it refers to the host and rack where the resource is located. VMS or more complex network structures may be supported in the future. Resource requirements at this stage refer to the number of memory and CPU requirements. - number-of-containers: collection of containers that meet requirements.
<resource-name, priority.resource-requirement.number-of-containers>
Copy the code

2.6 JobHistoryServer

Run the mr-jobhistory-daemon.sh start historyserver command to record the historical running status of jobs scheduled in YARN. You do not need to do any configuration on the data node in the cluster. Upon successful startup, the JobHistoryServer process appears (viewed using the JPS command, described below) and log details can be viewed from port 19888

2.6.1 What do WE see on YARN if we run a MapReduce program without starting JobHistoryServer?

Open the interface as shown in the picture below. Click History in the picture below, and the page will jump again

After clicking History, the following page is blank, which is caused by the fact that we did not start JobHistoryServer.

2.6.2 Run the mr-jobhistory-daemon.sh start historyServer command on the three machines to start JobHistoryServer one by one.

Ok, so let’s run the WordCount program here after we’ve started JobHistoryServer on three nodes (remember to delete the output directory before starting)

Click the History link to go to a page where map and Reduce tasks listed in TaskType are displayed. Total indicates map and Reduce task data required by the MapReduce program.

Click on the Map link in the TaskType column to see information about the Map task such as execution status, start time, and completion time.

We can use the same method to view the details of reduce job execution, which will not be described here.

From the above operations, we can see that jobHistoryServer is a record of historical running information during job running, which is convenient for job analysis.

2.7 Timeline Server

It is used to write log service data that is combined with third-party log service data (such as Spark). According to the official website, JobHistoryServer is an effective supplement to JobHistoryServer. Jobhistoryserver can only record mapReduce job information. In addition to jobHistoryServer’s ability to record the information during the running of a job, there is also more fine-grained information recording, such as which queue the job is running in, and which user is set to run the job.

Jobhistory Server can only record records of MapReduce applications. Timelineserver is more powerful, but it does not replace JobHistory. Jobhistory and Timelineserver are complementary.

Website tutorial

3. Operation principle of YARN application

3.1 How does Yarn Work?

The basic concept of YARN is to split JobTracker/TaskTracker into the following entities:

  1. A global ResourceManager ResourceManager
  2. One ApplicationMaster per application
  3. One NodeManager per slave node
  4. Each application has one Container running on NodeManager

3.2 Yarn Application Submission Process

The Application execution process in Yarn can be summarized as follows:

Application submission 2. Launch the ApplicationMaster instance of the application 3. ApplicationMaster instance manages the execution of the applicationCopy the code

Specific submission process:

  1. The client submits applications to ResourceManager and requests an ApplicationMaster instance

  2. ResourceManager finds a NodeManager that can run a Container and starts the ApplicationMaster instance in this Container

  3. ApplicationMaster registers with ResourceManager. After the registration, the client can query ResourceManager to obtain details about ApplicationMaster. You can now interact directly with your ApplicationMaster (when the client communicates with the ApplicationMaster, the application sends a resource request to the ApplicationMaster that meets its needs).

  4. During common operations, ApplicationMaster sends resource-Request requests to ResourceManager based on the Resource-Request protocol

  5. When the Container is successfully allocated, the ApplicationMaster starts the Container by sending the Container-launch-Specification message to the NodeManager, The Container-launch-Specification information contains the information needed to enable the Container and ApplicationMaster to communicate

  6. The application code runs as a task in a startup Container and sends the progress, status, and other information to the ApplicationMaster using the Application-Specific protocol

  7. During the running of an application, the client submitting the application actively communicates with the ApplicationMaster to obtain the operating status and progress updates of the application. The communication protocol is also application-specific

  8. Once the application is executed and all related work is completed, ApplicationMaster unregisters and closes ResourceManager, and all containers used are returned to the system.

Thin provisioning: Step 1: Submit applications to ResourceManager. Step 2: ResourceManager applies for resources for ApplicationMaster and communicates with a NodeManager to start the first Container to start ApplicationMaster. Step 3: ApplicationMaster registers and communicates with ResourceManager to apply for resources for internal tasks. Once resources are obtained, ApplicationMaster communicates with NodeManager to start tasks. Step 4: After all tasks are complete, ApplicationMaster logs out of ResourceManager. The entire application is complete.

3.3 graphs on yarn

MapReduce works based on YARN:

We submit jar packages for MapReduce processing, so the whole operation process is divided into five steps:

Submit MapReduce jobs to the client. 2. Then Yarn's ResourceManager allocates resources. 3. ApplicationMaster and ResourceManager apply for resources and interact with each other, and NodeManagers manage MapReduce jobs. 5. Distribute job configuration files and JAR packages to each node through HDFS.Copy the code
1 Job initialization process

1. When resourceManager receives notification of calling the submitApplication() method, scheduler allocates the Container and ResouceManager sends the applicationMaster process. Inform each nodeManager manager.

2. It is up to the applicationMaster to decide how to run the tasks. If the job data is small, the applicationMaster will choose to run the tasks in a JVM. So how do you tell if this job is big or small? When the number of Mappers for a job is less than 10 and there is only one reducer or the read file size is less than one HDFS block, (by modifying the configuration items graphs. Job. Ubertask. Maxmaps, graphs. Job. Ubertask. Maxreduces and graphs. Job. Ubertask. Adjust maxbytes)

3. Before running tasks, applicationMaster will call the setupJob() method and create an output path for output.

â‘¡ Task allocation

1. ApplicationMaster requests the Containers to perform Map and Reduce tasks from ResourceManager (Step 8), where map tasks have a higher priority than Reduce tasks. After all map tasks are completed, sort is performed and the reduce task is started. (One point here is that when 5% of map Tasks are executed, reduce will be requested, as summarized below.)

2. Running tasks consumes memory and CPU resources. By default, map and reduce tasks are allocated 1024MB and one core. (can modify the operation of the minimum and maximum parameter configuration, graphs, maps, the memory. The MB, and graphs. Reduce. The memory. The MB, graphs, maps, CPU. Vcores, graphs. Reduce. Reduce. The CPU .vcores.)

â‘¢ Task execution

1. A Task is allocated to a Container by ResourceManager. ApplicationMaster tells NodeManager to start the Container. The task will be run by a Java Application whose main function is YarnChild, but before running the task, first locate the JAR packages, configuration files, and files loaded into the cache that the task needs.

YarnChild runs in a dedicated JVM, so the failure of any map or Reduce task does not affect the crash or hang of the entire NodeManager.

3. Each task can be completed in the same JVM Task, with the completed processing data written to a temporary file. Data flow graphs

â‘£ Update operation progress and status

MapReduce is a batch processing process that takes one hour, several hours, or even several days. Job running status monitoring is very important. Each job and task has a status including job (running,successfully completed,failed), value counter, status information, and description information (description information is usually printed in the code).

So how does this information communicate with the client?

When a task starts to execute, it keeps a record of the percentage of completed tasks. For Map tasks, it records the percentage of completed tasks. For Reduce tasks, it may be complicated, but the system still estimates the percentage of completed Tasks. When a Map or Reduce job is executed, the child process interacts with the applicationMaster every three seconds. Until the Job is done

3.4 Yarn Application Life Cycle

  • RM: Resource Manager
  • AM: Application Master
  • NM: Node Manager
  1. The Client submits applications to RM, including AM programs and commands for starting the AM.
  2. RM assigns the first container to AM and communicates with NM to start the application AM on the container.
  3. When an AM is started, it registers with RM to allow clients to obtain AM information from RM and communicate with the AM directly.
  4. The AM negotiates container resources for applications through the resource request protocol.
  5. If the container is successfully allocated, AM requires NM to start the application in the container. After the application is started, IT can communicate with AM independently.
  6. The application executes in the container and reports to the AM.
  7. During application execution, the Client communicates with the AM to obtain application status.
  8. After the application is complete, the AM deregisters and stops the application to RM to release resources.

Apply for resources > Start appMaster > Apply for a Container to run the Task > Distribute the Task > Run the Task > End the Task > Reclaim the Container

How to use YARN

4.1 Configuration File

<! -- $HADOOP_HOME/etc/hadoop/mapred-site.xml -->
<configuration>
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
</configuration>
Copy the code
<! -- $HADOOP_HOME/etc/hadoop/yarn-site.xml -->
<configuration>
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
</configuration>
Copy the code

4.2 YARN Starting Stops

Start ResourceManager and NodeManager (RM and NM for short)

#The primary node runs commands
$HADOOP_HOME/sbin/start-yarn.sh
Copy the code

Stop RM and NM

#The primary node runs commands
$HADOOP_HOME/sbin/stop-yarn.sh
Copy the code

If RM is not started, start it separately

#If RM is not started, run the command on the active node
$HADOOP_HOME/sbin/yarn-daemon.sh start resouremanager
#Instead, it can be closed separately
$HADOOP_HOME/sbin/yarn-daemon.sh stop resouremanager
Copy the code

If NM is not started, it can be started separately

#If NM is not started, run the command on the corresponding node
$HADOOP_HOME/sbin/yarn-daemon.sh start nodemanager
#Instead, it can be closed separately
$HADOOP_HOME/sbin/yarn-daemon.sh stop nodemanager
Copy the code

V. YARN scheduler

Imagine that your current company has a Hadoop cluster. However, project group A often makes some periodic BI reports, while project group B often uses some software to make some temporary requirements. They will definitely have to submit tasks at the same time. How do they allocate resources to fulfill both tasks? Do you do A first and then B, or do both at the same time?

If you are confused, learn more about yarn’s resource scheduler.

The scheduler is an important part of the Yarn framework. With proper scheduling rules, it is possible to ensure that multiple applications can work at the same time. The most primitive scheduling rule is FIFO, which determines which task will be executed first according to the time when the user submits the task, and the task submitted first will be executed first. However, it is likely that one large task will monopolize the resource and the other resources will have to wait. It is also possible that a bunch of small tasks occupy resources, while large tasks are unable to get proper resources, resulting in hunger. So ALTHOUGH FIFO is very simple, but can not meet our needs.

Ideally, resource requests made by yarn applications should be fulfilled immediately. In reality, however, resources are limited, and on a busy cluster, an application often has to wait to get the resources it needs. The job of the YARN scheduler is to allocate resources to an application based on a defined policy. Scheduling is often a challenge, and there is no one “best” policy, which is why YARN offers multiple schedulers and configurable policies to choose from.

Yarn is divided into level-1 scheduling management and level-2 scheduling management level-1 scheduling management (closer to the bottom layer, closer to operation resources, and more likely to combine the application layer with the bottom layer) Computing resource management (CPU and memory, which consume more cpus for complex computing) App lifecycle management Level-2 scheduling management (algorithms of its own code, etc.) The computing model inside the App manages a variety of computing models

The 5.1 scheduler

There are three kinds of schedulers to choose from in Yarn: FIFO Scheduler, Capacity Scheduler, and FairS uler

5.2 FIFO Scheduler

FIFO Scheduler arranges applications into a queue according to the order of submission, which is a first-in, first-out queue. When allocating resources, the application at the top of the queue will be allocated resources first, and the next application will be allocated after the demand of the application at the top is satisfied, and so on.

FIFO Scheduler is the simplest and easiest to understand Scheduler and does not require any configuration, but it is not suitable for shared clusters. A large application may consume all cluster resources, causing other applications to block. In a shared cluster, Capacity or Fair Scheduler is preferred, both of which allow large and small tasks to receive system resources while committing.

The Yarn scheduler comparison diagram below shows the differences between these schedulers. As shown in the diagram, small tasks are blocked by large tasks in a FIFO scheduler.

5.3 Capacity Scheduler

The capacity scheduler is used by default

For the Capacity scheduler, there is a dedicated queue for running small tasks, but setting up a dedicated queue for small tasks preemptively consumes cluster resources, resulting in the execution time of large tasks falling behind that of FIFO schedulers.

How do I configure a capacity scheduler

The queue hierarchy is as follows:

Root ├── ├─ spark ├─ HDPCopy the code

On the active node, back up the capacity- Scheduler. XML configuration file in $HADOOP_HOME/etc/hadoop/ to another directory

If the Hadoop cluster is started, stop the Hadoop cluster

Create a new capacity-scheduler. XML file in $HADOOP_HOME/etc/hadoop/; It reads as follows.

Copy the XML file to the same directory remotely

Restarting the Hadoop Cluster

<?xml version="1.0" encoding="utf-8"? >

<configuration> 
  <property> 
    <name>yarn.scheduler.capacity.root.queues</name>  
    <value>prod,dev</value> 
  </property>  
  <property> 
    <name>yarn.scheduler.capacity.root.dev.queues</name>  
    <value>hdp,spark</value> 
  </property>  
  <property> 
    <name>yarn.scheduler.capacity.root.prod.capacity</name>  
    <value>40</value> 
  </property>  
  <property> 
    <name>yarn.scheduler.capacity.root.dev.capacity</name>  
    <value>60</value> 
  </property>  
  <property> 
    <name>yarn.scheduler.capacity.root.dev.maximum-capacity</name>  
    <value>75</value> 
  </property>  
  <property> 
    <name>yarn.scheduler.capacity.root.dev.hdp.capacity</name>  
    <value>50</value> 
  </property>  
  <property> 
    <name>yarn.scheduler.capacity.root.dev.spark.capacity</name>  
    <value>50</value> 
  </property> 
</configuration>
Copy the code

Which queue you place your application in depends on the application itself.

For example, you can specify a queue by setting the attribute mapreduce.job.queuename. Take WordCount as an example, as follows

If the specified queue does not exist, an error occurs. If not specified, the “default” queue is used by default, as shown in the following figure

Package the program, submit the cluster to run, here will not demonstrate.

In addition, it is very simple to modify the queue properties and add new queues. You need to edit conf/capacity-scheduler.xml and run it

    $ yarn rmadmin -refreshQueues
Copy the code

Capacity Scheduler Reference

5.4 Fair Scheduler

Capacity Scheduler is used by default

To use Fair Scheduler, configure yarn-site.xml, The attribute “yarn. The resourcemanager. Scheduler. Class” the value of the modified into “. Org. Apache hadoop. Yarn. The server. The resourcemanager. Scheduler. Fair. FairSchedule R “, as follows

<property>
	<name>yarn.resourcemanager.scheduler.class</name>
	<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
</property>
Copy the code

Note: Also, all yarn-site. XML files in the cluster must be updated synchronously

In the Fair scheduler, we do not need to use system resources up front. The Fair scheduler dynamically adjusts system resources for all running jobs. As shown in the following figure, when the first big job is submitted, only this job is running and it gets all the cluster resources. When the second small task is submitted, the Fair scheduler allocates half of the resources to the small task so that the two tasks share cluster resources equally.

Note that in the Fair scheduler below, there is a delay between the second task submission and the retrieval of the resource because it waits for the first task to release the occupied Container. Small tasks will also release their occupied resources after the completion of the execution, and large tasks get all the system resources. The end result is that the Fair scheduler achieves high resource utilization while ensuring that small tasks are completed on time.

Ps: support resource preemption: in yarn – site. XML set yarn. The scheduler. Fair. Preemption to true

6. Yarn Application status

On the WEB UI of YARN, you can see that the YARN application is in the following states:

NEW ----- Creating a status NEW_SAVING----- Creating a saving status SUBMITTED----- Submitting status ACCEPTED----- Accepting status RUNNING port RUNNING status FINISHED parameter description Default value Saving ----- Creating a saving status SUBMITTED status ACCEPTED----- Accepting status RUNNING port RUNNING status FINISHED parameter Description FINISHED port finishing status FAILED----- Failure status KILLED----- Kill statusCopy the code

finally

This chapter describes the application scenarios, core components, application scheduling process, and typical applications of YARN. It’s important but generally conceptual, so I don’t know what to do when I write it. I can only stack concepts, and some small operations in the middle are not difficult. If you are interested, you can use virtual machine to build a three-node cluster (2GB each) and try it out.