Graphs ChuXi

MapReduce is a computing framework, and since it’s a framework for doing computation, the representation has an input. MapReduce operates this input and obtains an output through its defined computing model, which is the desired result.

When running a MapReduce computational task, the task process is divided into two phases: the Map phase and the Reduce phase, and each phase uses key/value pairs as input and output. All we programmers need to do is define the functions for these two phases: map and reduce.

A basic instance of MapReduce

To explain how MapReduce works, let’s first take a look at the Hello World instance of WordCount in MapReduce, which is present in any version of the Hadoop installer.

package org.apache.hadoop.examples; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; /** ** Description: WordCount explains by York * @author Hadoop Dev Group */ publicclass WordCount { /** * TokenizerMapper inherits from the generic class Mapper * Mapper class: implements the Map function base class * Mapper interface: * WriteableComplable interface: The classes that implement WritableParable can be compared to each other. All classes used as Key should implement this interface. * Reporter can be used to report the progress of the entire application, which is not used in this example. * */ public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ /** * IntWritable, Text is a class implemented in Hadoop that encapsulates Java data types. These classes implement the WriteableParable interface. * can all be serialized to facilitate data exchange in a distributed environment. PrivateFinalStatic Intwritable one =new Intwritable (1); privateFinalStatic Intwritable one =new Intwritable (1); private Text word =new Text(); /** * Mapper: Void map(K1 key, V1 value, Context Context) void map(K1 key, V1 value, Context Context) * Map a single input k/v pair to an intermediate k/v pair * Output pairs do not need to be of the same type as the input pair. Input pairs can map to zero or more output pairs. Context: Collect <k,v> pairs from Mapper. * The write(k, v) method of the Context: Add a (k,v) to the Context * Programmers mainly write the Map and Reduce functions. Public void Map (Object key, String key, String key, String key, String key, String key, String key, String key, String key) Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr =new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result =new IntWritable(); /** * Reducer: /** Reducer:  * void reduce(Text key, Iterable<IntWritable> values, */ public void reduce(Text key) */ public void reduce(Text key) */ public void reduce(Text key) */ Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum =0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); }} public Staticvoid main(String[] args) throws Exception {/** * Configuration: / Configuration conf =new Configuration(); / Configuration conf =new Configuration(); String[] otherArgs =new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length ! =2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job =new Job(conf, "word count"); Job. setJarByClass(wordCount. Class); job.setMapperClass(TokenizerMapper.class); // Set the Mapper Job.setComCominerClass (IntSumReducer.class) for Job; // Set the Combiner class to the map-reducerClass for a job. // Set the Reducer class to Job.setOutputKeyClass (Text.class); // Set the Key class Job. setOutputValueClass(intwritable.class) for the output data of Job; / / set the value for the job output class FileInputFormat addInputPath (job, new Path (otherArgs [0])); / / set the input Path for job FileOutputFormat. SetOutputPath (job, new Path (otherArgs [1])); // Set the output path for Job System.exit(Job. WaitForCompletion (true)? 0:1); // run job}}

WordCount parses line by line

  • For the map function:
Throws IOException public void map(Object Key, Text Value, Context Context) throws IOException, InterruptedException {... }

The first two Object keys, the Text value, are the input key and the value, and the third parameter, the Context, is the input key and the value. One context. Write (word).

  • For the method of reduce function:
Public void reduce(Text key, Iterable<IntWritable> values, Context Context) throws IOException, InterruptedException {... }

The input to reduce is also a key/value, but its value is an iterator Iterable<IntWritable> values. In other words, the input to reduce is the value of a key corresponding to a set of values. Reduce also has a context that works the same way map does. As for the logic of calculation, programmers need to code to achieve.

  • A call to main:

The first is

Configuration conf = new Configuration();

Before running a MapReduce program, you need to initialize the Configuration. This class reads the Configuration information of the MapReduce system, including HDFS and MapReduce, which is the Configuration file used when installing Hadoop, for example: XML, hdfs-site. XML, mapred-site. XML, and so on. Some kids don’t understand why they’re doing this, because they haven’t thought through the MapReduce computing framework. We programmers are just filling in the bland when we develop MapReduce. Write the actual business logic in the map and reduce functions. The MapReduce framework does the rest of the work itself, but at least we have to tell it how to do it, like where HDFS is, where the JobStracker of MapReduce is, This information is in the configuration file under the conf package.

The following code is:

String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length ! = 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); }

If you run a WordCount program, it must have two arguments. If it doesn’t, it will exit with an error. The GenericoptionSparser class from the first sentence is used to explain common Hadoop commands and set the Configuration object to the desired value. In fact, we don’t often use it in development, but instead we use the class to implement the Tool interface. Then we run the program using the ToolRunner inside main, which calls GenericoptionSparser from within the ToolRunner.

The following code is:

    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);

The first line is building a job. In the MapReduce framework, a MapReduce task is also called a MapReduce job, and the specific map and reduce operations are called tasks. Here we are building a job with two parameters. One is the name of the job and one is the name of the job.

The second line is to load the computer program written by the programmer, such as our program class name is WordCount. I want to make a correction here. Although we only need to implement map and reduce functions when we write MapReduce, we actually implement three classes. The third class is to configure how MapReduce runs map and reduce functions. To be precise, build a job that MapReduce can perform, such as a WordCount class.

This is the Combiner class. This class is related to the MapReduce mechanism. In this example, it doesn’t matter if the fourth line is removed, but it will theoretically work better if the fourth line is used.

The following code:

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

This is the type of key/value that defines the output, which is the type of key/value that will eventually be stored in the resulting file on HDFS.

The final code is:

FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0:1);

The first line builds the input data file, the second line builds the output data file, and the last line exits as normal if the Job runs successfully.