1. Analyze the operating mechanism of MapReduce

You can run a MapReduce job with just one line of code: jobClient.runjob (conf). There’s a lot of processing detail behind this short code. This chapter reveals the steps Hadoop takes to run a job.

The whole process is shown in the following figure: consists of the following four independent entities

  • Client: Submits MapReduce jobs
  • Jobtraceker: Coordinates the running of jobs. Jobtracker is a Java application whose main class is JobTracker.
  • Tasktracker: Runs tasks that are divided by jobs. Tasktracker is a Java application whose main class is TaskTracker
  • A distributed file system (commonly known as HDFS) is used to share job files among other entities.

2. Submission of homework

Jobclinet’s runJob (conf) method is a convenient way to create a new Jobclinet instance and call the shumbitJob() method, as shown in the figure below. After the job is submitted, runJob () polls the progress of the job every second and reports the progress to the console if it has changed since the last report. When the job completes, if it succeeds, the job counter is displayed. If it fails, the error that caused the job to fail is logged to the console.

  • Request a new job ID from JobTracker (obtained by using jobTracker’s getNewJobId () method). See Step 2.
  • Check the output description of the job. For example, if no output directory is specified or the output directory already exists, the job is not submitted and errors are thrown back to MapReduce.
  • Calculates the input shard for the job. If the sharding cannot be calculated, for example because the input path does not exist, the job is not submitted and the error is returned to the MapReduce program.
  • Copy the resources needed to run the job (including the job JAR files, configuration files, and calculated input shards) to the JobTracker file system in a directory named after the job ID. There are many copies of the job JAR (controlled by the mapred.submit. Replaction property, which defaults to 10), so there are many copies in the cluster that taskTracker can access when running the job’s task. Go to Step 3.
  • Notifying JobTracker that the job is ready to execute (by calling JobTracker’s submit() method). Participate in Step 4

3. Initialization of jobs

When JobTracker receives a call to its Submit () method, it puts the call into an internal queue and is scheduled and initialized by the Job Scheduler. Initialization involves creating a proprietary running job – encapsulating the task and logging information to keep track of the status and progress of the task. (Step 5)

To create a list of tasks to run, the job scheduler first retrieves the input shard information that JobClient has calculated from the shared file system (Step 6). A map task is then created for each shard. The number of reduce tasks to be created is determined by the mapred.reduce.task attribute of JobConf, which is set by using the setNumReduceTasks () method, and then the scheduler creates a corresponding number of Reduce tasks to be run. The task is given an ID at this point.

4. Assignment of tasks

Tasktracker runs a simple loop to periodically send “heartbeats” to JobTracker. The heartbeat tells JobTracker if the TaskTracker is alive and acts as a message link between the two. As part of the heartbeat, TaskTracker indicates whether it is ready to run a new task, and if so, JobTracker assigns it a task and communicates with TaskTracker using the return value of the heartbeat (Step 7).

Before JobTracker can select a task for TaskTracker, JobTracker must first select the job in which the task resides. Once the job is selected, JobTracker can select a task for that job.

For Map and Reduce tasks, TaskTracker has a fixed number of task slots. For example, a TaskTracker might run two Map tasks and two Reduce tasks simultaneously. The exact number is determined by the number of TaskTracker cores and memory size. By default, the scheduler fills up the free map slots before processing reduce slots. Therefore, if TaskTracker has at least one free Map slot, JobTracker selects a map task for it. Otherwise, select a Reduce task.

To select a Reduce job, JobTracker simply selects the next reduce job from the list of tasks to be run and executes it, regardless of data localization. For a map task, however, JobTracker takes into account the taskTracker’s network location and selects the taskTracker closest to its input shard. In the best case, the task is data-local, meaning that the task runs on the node where the input shard resides. Likewise, it might be rack-localized: the task and input shards are on the same rack, but not on the same node. Some tasks are neither data localised nor machine localised, but retrieve data from different racks of their own running. You can see the proportion of each type of task by looking at the counter of the job.

5. Task execution

Now that TaskTracker has been assigned a task, the next step is to run it. The first step is to localize the JAR files for the job by copying them from the shared file system to the file system where the TaskTracker is located. At the same time, TaskTracker copies all files required by the application from the distributed cache to local disk. Second, TaskTracker creates a local working directory for the task and unextracts the contents of the JAR file into it. Third, TaskTracker creates a TaskRunner instance to run the task.

TaskRunner starts a new JVM to run each task so that any problems with the user-defined Map and Reduce functions do not affect TaskTracker (such as causing crashes or hangs, etc.). However, it is possible to reuse the JVM between different tasks.

The child communicates with the parent through the Umbilical interface. The child of the task informs the parent of its progress every few seconds until the task is complete

Streaming and Pipes

Streaming and Pipes both run special Map and Reduce tasks to run and communicate with user-supplied executables

In Streaming, a task uses standard input and output Streaming to communicate with a process (which can be written in any language), while a Pips task listens on a socket and sends a port number in its environment to write a C++ process, so that at first, A C++ process can establish a persistent socket connection to its parent Java Pips task

In both cases, during the execution of the task, the Java process passes the input key/value pair to an external process, which executes it through a user-defined Map or Reduce function and passes its output key/value pair back to the Java process. From TaskTracker’s point of view, it is as if taskTracker’s child processes were working on map or Reduce code themselves.

6. Progress and status updates

MapReduce jobs are batch jobs that run for a long time. The duration ranges from several seconds to several hours. This is a long time period, so it is important for users to be able to keep track of the progress of their work. A job and each of its tasks have a status, which includes: the status of the job or task (e.g., run status, successful completion, failed status), map and Reduce progress, the value of the job counter, and status messages or descriptions (which can be set by user code). How does this state information communicate with the client as it changes during the job?

While a task is running, its progress (the percentage of tasks completed) remains final. For a Map task, the task progress is the proportion of input that has been processed. For reduce tasks, the situation is a little more complicated, but the system still estimates the percentage of Reduce input that has been processed. The whole process is divided into three parts, corresponding to the three stages of Shuffle. For example, if the job had done the usual reducer input, the job would be 5/6 because the copy and sort phases (1/3 each) and half (1/6) of the reduce phase have been completed.

The composition of progress in MapReduce

Progress isn’t always measurable, but it tells Hadoop that a task is running anyway. For example, a task that outputs a record first can also be expressed as progress, although it cannot be expressed as a number such as the percentage of the total number that needs to be written, because there is no way of knowing what will happen next even if the output is generated by the task.

Progress reporting is important because it means Hadoop is not going to fail tasks that are being executed. All the operations that make up the schedule are as follows:

  • Read an input record (in mapper or Reducer)
  • Read an output record (in mapper or Reducer)
  • Set the status description in a Reporter (using Reporter’s setStatus() method)
  • Incrementing counters (use Reporter’s incrCounter() method)
  • Invoke Reporter’s progrss() task

Tasks also have a set of counters, either built into the framework, such as the number of map output records that have been written, or defined by the user, that count the various events that occur during a task’s run.

If the task reports progress, a flag is set to indicate that the status change will be sent to TaskTracker. If a separate thread checks this flag every three seconds, if set, it informs TaskTracker of the current task status. At the same time, TaskTracker sends “heartbeats” to JobTracker every five seconds (the five-second interval is the minimum because the “heartbeats” interval is actually determined by the size of the cluster); For a larger cluster, the interval is longer, and the status of all tasks run by TaskTracker is sent to JobTracker in the call. Counters are usually sent at intervals of less than 5 seconds because of the relatively high bandwidth footprint of counters.

Jobtracker combines these updates to produce a global view of all running jobs and the status of the tasks they contain. Finally, as mentioned earlier, JobClient receives the latest status by querying the JobTracker every second. The client can also use JobClient’s getJob() method to get an instance of a RunningJob. The latter contains all the state information for the job.

7. Completion of homework

When JobTracker receives notification that the last task of the job has completed, it sets the status of the job to Success. Then, when JobClient queries the status, it knows that the task has completed successfully, so JobClient prints a message to inform the user and returns it from the runJob() method.

An HTTP job notification is also sent if JobTracker is set accordingly. Clients that want to receive a callback can do this via the job.end.notification.url property.

Finally, JobTracker clears the working state of the job, instructing TaskTracker to also clear the working state of the job (such as removing intermediate output)

8, failed,

In real situations, user code has software bugs, processes crash, and machines become dependent. One of the main benefits of using Hadoop is that it can handle such failures and complete the job.

8.1 The task fails

First consider the case where the subtask fails. The most common situation is when user code in map and RDuce tasks throws a run exception. If this happens, the subtask JVM process sends an error report to its parent taskTracker before exiting. Error reports end up in the user log. Tasktracker marks this task attempt as failed, freeing a task slot to run another task.

Another error is the sudden exit of the child JVM — possibly due to a JVM bug (a software defect) that caused some special reason for the Exit of the MapReduce user code. In this case, TaskTracker will notice that the process has exited and mark the attempt as failed.

Pending tasks are handled differently. Once TaskTracker notices that it has not received progress updates in a while, it marks the task as failed. After this, the JVM child process is automatically killed. The elapsed time for task failure is typically 10 minutes, which can be done on a job basis (or cluster basis with the mapred.task.timeout property set to a value in milliseconds)

If timeout is set to 0, the timeout rule is turned off, so long-running tasks are never marked as failed. In this case, the suspended task never releases its task slot and ultimately reduces the efficiency of the entire cluster over time. Try to avoid this setup, and make sure that each task has regular reports on its progress.

JobTracker knows that a task attempt has failed (called by TaskTracker’s “heartbeat”) and it reschedule the execution of the task. JobTracker tries to avoid rescheduling tasks that fail on TaskTracker. In addition, if a task fails more than 4 times, it will not be retried. This value can be set. For a map task, the maximum number of attempts to run the task is controlled by the attribute mapred.map.max-Attempts. And to reduce task, by mapred. Reduce. Max. Attempts property control. By default, if any task fails more than four times (or if the maximum number of attempts is set to four), the entire job fails.

For some applications, we do not want to abort the entire job once a few tasks fail, because some of the results of the job may still be available even if a few tasks fail. In this case, you can set the maximum percentage of jobs that are allowed to fail without triggering job failures. Map tasks and reduce tasks can be independently controlled, respectively by: mapred. Max. Map. Failures. The percent and mapred. Max. Reduce. Failures. The percent attribute to set.

Task attempts can also be aborted, which is different from failure. A Task attempt can be aborted because it is a speculative copy, or because the Tasktracker it is on failed, Lead to the jobtracker will not be included in the task to run attempts (by mapred. Map. Max. Attempts and mapred. Reduce. Max. Attempts to set), because trying to suspend the task is not wrong.

Users can also abort or cancel task Attempt by typing Hadoop Job on the Web UI or command-line interface (CLI). A job can be aborted using the same mechanism.

8.2 taskTracker fails

Tasktracker failure is another mode of failure. If a TaskTracker fails because it crashes or runs too slowly, it will stop sending heartbeats (or very few heartbeats) to JobTracker. Jobtracker will notice a TaskTracker that has stopped sending a heartbeat (assuming it hasn’t received one for 10 minutes). . This value by the mapred. Tasktracker. Expiry interval attribute to set, in milliseconds), and removed from the wait for task scheduling tasktracker pool. If the job is incomplete, JobTracker will re-run the map task that has been successfully completed on the TaskTracker because the Reduce task cannot be accessed. Their intermediate output (both stored on the local file system of the failed Tasktracker). Any ongoing tasks will also be rescheduled.

Even if TaskTracker does not fail, it may still be blacklisted by JobTracker. If the number of failed tasks on TaskTracker is much higher than the average number of failed tasks for the cluster, it will be blacklisted. Blacklisted TaskTracker can be removed from jobTracker’s blacklist by restarting it.