Introduction to the

MapReduce is a programming framework for distributed computing programs, which is the core framework for users to develop “data analysis applications based on Hadoop”.

The core function of MapReduce is to integrate business logic code written by users and default components into a complete distributed computing program, which runs concurrently on a Hadoop cluster.

Why MapReduce

  1. Massive data processing on a single machine due to hardware resource constraints, not competent
  2. Once the stand-alone version of the program is extended to a cluster to run distributed, it will greatly increase the complexity and development difficulty of the program
  3. With the introduction of the MapReduce framework, developers can focus most of their work on the development of business logic, leaving the complexity of distributed computing to the framework to handle

Consider the WordCount requirement in a massive data scenario:

Standalone version: limited memory, limited disk, limited computing power


1. File Distributed Storage (HDFS)

2. Operational logic should be divided into at least two stages (one stage is independent and concurrent, and the other stage is convergent).

3. How are the algorithms distributed

4. How the program allocates computing tasks (slice)

5. How to start the two-phase program? How to coordinate?

6, the whole process of the operation of the monitoring? Fault tolerant? Try again?

It can be seen that a lot of complicated work will be introduced when the program is expanded from stand-alone version to distributed version. To improve development efficiency, common functions in distributed programs can be encapsulated into a framework that allows developers to focus on business logic.

The basic process of parallel processing of MapReduce

The first is the difference between before and after Hadoop2.0:

  • Before 2.0, there was only MapReduce operating framework, in which there were only two kinds of nodes, one is master, the other is worker. The master does both resource scheduling and program scheduling, and the worker is only used to participate in computation.
  • After 2.0, however, a Yarn cluster has been added. The master node of the Yarn cluster is responsible for resource scheduling, and the slave nodes of the Yarn cluster pick a node (as determined by RedourceManager) to perform resource scheduling for the application, similar to what the master did before 2.0. Resource scheduling: CPU and memory resources needed to process, and disk resources needed to store data.

Starting from the User Program on the graph, User Program links the MapReduce library and implements the most basic Map function and Reduce function.

1. The MapReduce library divides the input files into M pieces, that is, splits them into split0-4 pieces as shown in the left figure, and then uses fork to copy the user process to other machines in the cluster. 2. The Worker assigned a Map job begins to read the input data of the corresponding shard. The Map job extracts key-value pairs from the input data, and the intermediate key-value pairs generated by the Map () function are cached in the memory. 3. Cached intermediate key-value pairs will be written to the local disk on a regular basis. The location of these intermediate key-value pairs will be notified to the Master, who is responsible for forwarding the information to the Reduce Worker. 4. The Reduce Worker reads the intermediate key-value pairs of the assigned Reduce job, sorts them, and aggregates the key-value pairs of the same key. 5. The Reduce Worker traverses sorted key values and passes them to the Reduce () function. The output generated by the Reduce () function will be added to the output file of the partition.

MapRRPie Input and Output Problems

  • The Map/Reduce framework operates on

    key-value pairs. That is, the framework treats the input of the job as a set of

    key-value pairs and also produces a set of

    key-value pairs as the output of the job. These two sets of key-value pairs may be of different types.
  • The framework needs to serialize the Key and Value classes, and therefore these classes need to implement the Writable interface (a serialized object that implements the Columnization protocol, the primary purpose of serialization being to persist storage or for network transport). In addition, in order for the framework to perform sorting operations, the Key class must implement the WriteableParable interface.

The input and output types of a Map/Reduce job are as follows:

(input) <k1, v1> -> map -> <k2, v2>-> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

The actual MapReduce process

  • MapReduce can be seen as an embodiment of divide-and-conquer algorithms. The so-called divide-and-conquer algorithm is “divide and conquer”, which decompose a large problem into subproblems of the same type (preferably of the same size), solve the subproblems, and then merge them into the solutions of the large problem.
  • MapReduce is a divide-and-conquer approach, where inputs are sliced and assigned to different tasks for processing, and then merged into the final solution.
  • The actual processing process of MapReduce can be interpreted as input-> map-> sort-> Combine->Partition-> reduce-> Output.
1, Input stage data in a certain format to pass to the Mapper, TextInputFormat, DPInputFormat, SequencefileFormat can be used, in the Job. SetInputFormat can be set, or you can customize the sharding function. 2. In the Map stage, the input (key, value) is processed, that is, Map (k1,v1)-> List (k2,v2), and Job. setMapPerClass is used for setting. Stage 3, Sort for Mapper output for sorting, using Job. SetOutputKeyComparatorClass set, and then define the collation. Combine Stage This stage combines the results of the same Key after Sort, set using Job.setCombinerClass, or you can customize the Combine Class Class. 5. The Partition phase breaks the Mapper's intermediate result into R parts (the number of Reduce jobs) according to the range of key, and the HashPartioner (Key.hashCode ()&Integer.MAX_VALUE%numPartitions) is used by default. You can also customize the partitioned functions, using the Job.setPartitionClass setting. 6. In the Reduce stage, the results of the Mapper stage are further processed, and the Job.setReducerClass is set up to define the Reduce class. 7. Format of Output data in the Output stage Reducer.

MapReduce framework structure and core operating mechanism

1. Structure A complete MapReduce program is a general framework for such a distributed program. The overall structure of its response to the above problems is as follows:

  • MRAppMaster: responsible for the process scheduling and state coordination of the entire application (Hadoop2.0 changed)
  • MapTask: It is responsible for the entire data processing flow in the Map phase
  • ReduceTask: Responsible for the whole data processing process of Reduce phase

1) When a MapReduce program is started, MrAppMaster is the first one to start. After that, MrAppMaster calculates the required number of MapTask instances according to the description information of this job. A corresponding number of MapTask processes are then started by the cluster application machine. 2) After the MapTask process is started, data processing is carried out according to the given data slice range. The main process is as follows:

  • Use the InputFormat specified by the customer to get RecordReader read data to form an input KV pair
  • The input KV pair is passed to the map() method defined by the client, the logical operation is performed, and the output KV pair of the map() method is collected into the cache
  • Sort the KV pairs in the cache by the KEY partition and keep overwriting to the disk file

3) After monitoring to the completion of all MapTask processes, MrAppMaster will start corresponding number of RemapTask processes according to the parameters specified by the client, and inform the data range (data partition) to be processed by the RemapTask process 4) After the RemapTask process starts, According to the location of the data to be processed informed by MrAppMaster, a number of MapTask output result files are obtained from several machines where MapTask is running, and remerge and sort them locally. Then, the reduce() method defined by the customer is called to perform logical operation according to the KV of the same key. And collect the result of the operation output KV, and then call the OutputFormat specified by the customer to output the result data to the external storage.

The parallelism degree of MapTask determines the concurrency degree of the task processing in the Map phase, and then affects the processing speed of the whole Job. So, is more parallel instances of MapTask better? How is the degree of parallelism determined?

The parallelism of the map phase of a job is determined by the client when the job is submitted. The basic logic of the client’s planning of the parallelism of the map phase is as follows: The data to be processed is logically sliced, and each split is allocated a parallel instance of MapTask for processing. This logic and the resulting slice planning description file are completed by the getSplits() method of the FileInputFormat implementation class

The degree of parallelism of the ReduceTask also affects the execution concurrency and efficiency of the whole job, but it is different from the number of concurrency of MapTask, which is determined by the number of slices. The decision of the number of ReduceTask can be set directly and manually:

// The default value is 1

Set it manually to 4 Job. SetNumReducetAsks (4);

If the data is not evenly distributed, it is possible to generate data skew during the Reduce phase. (Note: The number of reducetasks is not set arbitrarily. Business logic requirements should also be taken into account. In some cases, only 1 ReduceTask is required to calculate global summary results. Try not to run too many reducetasks, for most jobs it is best to have at most the same number of reduces as in the cluster or fewer than in the cluster’s reduce lots.

3.3 Shuffle mechanism of MapReduce

1) Outline how the data processed in the Map phase is transferred to the Reduce phase in MapReduce, which is the most critical process in the MapReduce framework. This process is called Shuffle. The shuffle’s core mechanisms: data partitioning, sorting, and caching. Specifically, it is to distribute the processing result data output by MapTask to the ReduceTask, and in the process of distribution, the data is partitioned and sorted according to the key.

Partition to determine which data goes into which reduce

Sort Sort by key

Combiner performs the merge of local values

2) Detailed process

1. MapReduce collects the KV pairs output by our map() method and puts them into the memory buffer 2. Local disk files are constantly overflowing from the memory buffer, which may overflow multiple files 3. 5. Go to each MapTask machine to get corresponding result partition data according to its own partition number. 6. 7. After the shuffle is merged into a large file, the process of shuffle is over and the logical operation process of reduceTask is followed (take out a key-value pair group from the file and call the user-defined reduce() method).

The size of the buffer in the Shuffle affects the efficiency of the MapReduce program; in principle, the larger the buffer, the fewer disk IO, and the faster the execution. The size of the buffer can be adjusted with the argument IO. Sort. MB default 100M.

The running process of a Job

The execution process of a MapReduce job is as follows: job submission -> job initialization -> task assignment -> task execution -> update task execution progress and status -> job completion.

A complete MapReduce job flow consists of four separate entities:

Client: Write MapReduce program, configure jobs and submit jobs.

JobTracker: Coordinates the running of the job, assigning the job, initializing the job, and

TaskTracker communicates.

TaskTracker: responsible for running jobs and keeping communication with JobTracker.

HDFS: Distributed file system that holds job data and results.

JobClient creates an instance of JobClient using the RunJob method, and then calls the submitJob() method to submit the job. The specific process of submitting the job is as follows:

1) Get a job ID from JobTracker by calling the getNewJobID () method of the JobTracker object. 2) Check the relevant path of the job. If the output path exists, the job will not be committed (preserving the result of the previous job run). 3) Calculate the input sharding of the job. If it cannot be calculated, for example, the input path does not exist, the job will not be submitted and the error will be returned to the MapReduce program. 4) Copy the resources needed to run the job (job JAR file, configuration file and calculated shard) to HDFS. 5) Tell the JobTracker job that it is ready to execute (use the submitJob() method of the JobTracker object to actually submit the job).

2. Job initialization

  • When JobTracker receives a request submitted by a Job, it saves the Job in an internal queue and lets the Job Scheduler process it and initialize it.
  • Initialization involves creating a Job object that encapsulates its tasks and keeps track of the state and progress of the task (Step 5).
  • After creating a series of Task objects to run, the Job Scheduler begins by retrieving the input splits(step 6) computed by the JobClient from the file system, and then creates a Map Task for each split.

The communication and task assignment between TaskTracker and JobTracker is done through the heartbeat mechanism. As a separate JVM, TaskTracker executes a simple loop that basically implements a heartbeat to the JobTracker every once in a while, telling the JobTracker whether the TaskTracker is alive and ready to perform a new task. If there are tasks to be assigned, it assigns a task to TaskTracker.

4. Task execution

  • Once the TaskTracker has applied for a new task, it runs locally. First, it is done by localizing the task (including the data, configuration information, code, and so on needed to run the task), that is, copying it from HDFS to the local area and calling localized job ().
  • For the task of creating Map or Reduce programs using Streaming and Pipes, Java will pass the key/value to an external process, and then process it through user-defined Map or Reduce, and then pass the key/value back to Java. It is as if TaskTracker’s children process the Map and Reduce code.

5. Update the progress and status of the task

  • Progress and status are updated and maintained through heartbeat.
  • For a Map Task, the progress is the ratio of processed data to all input data.
  • In the case of a Reduce Task, the situation is a little more complicated, consisting of three parts, copying intermediate result files, sorting, and reducing calls, each of which is 1/3.

When a Job completes, JobTracker receives a Job Complete notification and updates the current Job status to Successful. At the same time, the JobClient also rotates to know that the submitted Job has been completed and displays the information to the user. Finally, the JobTracker cleans up and recycles the resources associated with the Job and notifies the TaskTracker to do the same (such as deleting the intermediate result file)