White can also learn MapReduce programming

Reconsider graphs

We know that Hadoop has four core components:

  • HDFS
  • MapReduce
  • YARN
  • Common

HDFS: distributed storage system

MapReduce: distributed computing system YARN: resource scheduling system of Hadoop Common: supports the basic tool package and RPC (remote procedure call (RPC)) framework of the three components

As a computing system, MapReduce has three basic ideas for large-scale data processing:

How to Deal with big data: Divide and conquer

With big data that have no computational dependencies on each other, the most natural approach to parallelism is to adopt a divide and conquer strategy

Move to the abstract model: Mapper and Reducer

Parallel computing methods such as MPI lack high-level parallel programming model. In order to overcome this defect, MapReduce uses the ideas in Lisp functional language for reference and provides high-level parallel programming abstract model with Map and Reduce functions

Rise to architecture: Unify the architecture to hide system-level details for the programmer

Parallel computing methods such as MPI lack unified computing framework support, so programmers need to consider data storage, partition, distribution, result collection, error recovery and many other details. To this end, MapReduce designs and provides a unified computing framework that hides most system-level processing details from the programmer

Abstract Describes Map and Reduce

MapReduce draws on the ideas of functional programming language Lisp, defining the following two abstract Map and Reduce programming interfaces, which are implemented by users:

map: (k1; v1) -> [(k2; v2)]

Input: key-value pair (K1; V1) : Document data records (such as rows in a text file, or rows in a data table) are passed to the Map function in the form of “key-value pairs”; The map function processes these key-value pairs and outputs an intermediate set of key-value pairs processed as another key-value pair [(k2; v2)] output: a set of intermediate data represented by the key-value pair [(k2; v2)]

reduce: (k2; [v2]) -> [(k3; v3)]

Input: a set of key-value pairs [(k2; v2)] output by map will be merged to combine different values under the same primary key into a list [v2], so the input of reduce is (k2; [v2]) processing: some sort of collation or further processing of incoming intermediate result list data to produce a final result output of some form [(k3; v3)]. Output: The final output [(k3; v3)]

Map and Reduce provide the programmer with a clear abstract description of the operation interface

summary

  • Each map function processes the partitioned data in parallel, producing different intermediate results from different input data
  • Each reduce also computes in parallel and is responsible for processing different intermediate result data sets
  • Before reduce processing, all map functions must be completed. Therefore, a synchronization barrier is required before reducing. This phase is also responsible for aggregation and shuffle of the map’s intermediate result data so that Reduce can calculate the final result more efficiently
  • The final result can be obtained by summarizing the output results of all reduce

MapReduce programming detail

A simple WordCount program

Mapper

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/** * LongWritable offset long, Not line number * Input data in the Text map stage a line of Text information String type String * Data in the Text map stage String type String * IntWritable Map output value type, which corresponds to int in Java, Indicates the line number */
public class WorkCountMap 
    extends Mapper<LongWritable.Text.Text.IntWritable>{
	/** * key Input key * value input value * context context object */
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		
		String line = value.toString();
		String[] words = line.split("/t");/ / word segmentation
		for(String word : words) {
			Text wordText = new Text(word);
			IntWritable outValue = new IntWritable();
			/ / writecontext.write(wordText, outValue); }}}Copy the code
  • Angle brackets are JAVA generics that restrict the input data type of a function
  • In the above code, note that the generics of the Mapper class are not the basic Java types, but the Hadoop data types Text, IntWritable. We can simply equate this to the Java classes String and int.

The generics of the Mapper class in the code are

. The second parameter to the map method is the line text content that we care about. The core code is to split the line text content according to the space, extract the data of each line, the word as the new key, the number as the new value, write to the context context. In this case, because there are multiple sets of data, each set outputs a

key-value pair.
,>
,v1,k2,v2>

Reducer

The input in the Reduce phase is the output in the Mapper phase

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/** * Text: String String * IntWritable Input type in reduce phase int * Text Output data type in Reduce phase String * IntWritable Number of output word frequencies int */
public class WorkCountReduce extends Reducer<Text.IntWritable.Text.IntWritable>{
	/** * key Input key * value input value * context Context object, used to output key-value pairs */
	@Override
	protected void reduce(Text key, Iterable
       
         value, Context context)
        throws IOException, InterruptedException {

		int sum=0;
		for (IntWritable number : value) {
			sum += number.get();
		}
		// Number of words hadoop, 10
		context.write(key, newIntWritable(sum)); }}Copy the code

The main function

In Hadoop, a computing task is called a job. The main function is responsible for creating a job object and setting the corresponding Mapper and Reducer classes for it, as well as the input and output paths

public static void main(String[] args) throws Exception 
{    // Set the configuration file for the task
    Configuration conf = new Configuration();	 
    // Command line arguments
   String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();	 
    if(otherArgs.length ! =2) 
    {    System.err.println("Usage: wordcount <in> <out>");
         System.exit(2);
    } 	
    Job job = newJob (conf, "word count");// Create a user-defined Job
    job.setJarByClass(WordCount.class);	// Set the JAR to execute the task
    job.setMapperClass(WorkCountMap.class);	// Set the Mapper class
    job.setCombinerClass(WorkCountReduce.class);	// Set the Combine class
    job.setReducerClass(WorkCountReduce.class);	// Set the Reducer class
    job.setOutputKeyClass(Text.class);	// Set the key for the job output
    // Set the value of the job output
    job.setOutputValueClass(IntWritable.class);	
    // Set the path to the input file
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));	
    // Set the path to the output file
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));	 
    // Submit the task and wait for it to complete
    System.exit(job.waitForCompletion(true)?0 : 1);		 
}

Copy the code

End the core operating mechanism of MapReduce

Always say

A complete MapReduce program runs in distributed mode with two types of instance processes:

  1. MRAppMaster: is responsible for process scheduling and status coordination of the entire program (this process is on the YARN node).
  2. Yarnchild: Responsible for the entire data processing process in the MAP phase
  3. Yarnchild: Responsible for the entire data processing process in the Reduce phase

The processes of mapTask and reducetask in the above two stages are both YarnChild, which does not mean that mapTask and reducetask run in the same YarnChild process (the YarnChild process is on the node where the command is run).

Running process of MapReduce

  1. When a MapReduce program is started, MRAppMaster is started first. After MRAppMaster is started, it calculates the required number of MapTask instances based on the job description, and then applies to the cluster to start the corresponding number of MapTask processes
  2. After the mapTask process is started, data processing is carried out according to the given range of data slices (which offset range of which file). The main process is: A. Use the inputFormat specified by the customer to obtain the RecordReader to read data and form the input KV pair. B. Pass the input KV pair to the map() method defined by the customer for logical operation. And collect the KV pairs output by map() method to the cache C. After sorting the KV pairs in the cache according to the partition K, the KV pairs in the cache are constantly overwritten and written to the disk file (exceeding the cache memory, the KV pairs are written to the disk temporary file, and finally written to the file, ruduce obtains the file and deletes it).
  3. After MRAppMaster monitors the completion of all mapTask processes (the real situation is that after some mapTask processes complete processing, it will start reducetask to fetch data from the completed MapTask), It starts the corresponding number of reducetask processes according to the parameters specified by the customer, and informs the reducetask process of the data range (data partition) to process.
  4. After the Reducetask process is started, according to the location of the data to be processed informed by MRAppMaster, it obtains several mapTask output result files from the machines where several MapTasks run, and remerges and sort them locally. Then, according to the KV of the same key as a group, the customer-defined reduce() method is called for logical operation, and the result KV of operation output is collected. Then, the outputFormat specified by the customer is called to output the result data to external storage

Maptask parallelism determination mechanism

The parallelism degree of MapTask determines the concurrency degree of task processing in the MAP phase, which affects the processing speed of the whole job. The parallelism degree of the map phase of a job is determined by the client when the job is submitted. The basic logic of the client planning the parallelism degree of the map phase is as follows: Perform logical slices of the data to be processed (that is, divide the data to be processed into multiple logical splits according to a specific slice size), and then each split allocates a parallel instance of mapTask to process this section of logic and the resulting slice planning description file. This is done by the getSplits() method of the FileInputFormat implementation class. This method returns a List

that contains information about each logical slice, including length and position information, and the getSplit () method returns a set of InputSplits