“This is the 10th day of my participation in the November Gwen Challenge. See details of the event: The Last Gwen Challenge 2021”.

Overview of MapReduce

MapReduce is a programming framework for distributed computing programs and a core framework for users to develop Hadoop-based data analysis applications.

The core function of MapReduce is to integrate user-written service logic codes and default components into a complete distributed computing program, which concurrently runs on a Hadoop cluster.

2. Advantages and disadvantages of MapReduce

1, the advantages of

  1. MapReduce is easy to program

    It simply implements a few interfaces, can complete a distributed program, this distributed program can be distributed to a large number of cheap PC machines run. So if you write a distributed program, it’s the same thing as writing a simple serial program. This feature has made MapReduce programming very popular.

  2. Good scalability

    When your computing resources can’t be met, you can simply add more machines to expand their computing power.

  3. High fault tolerance

    MapReduce is designed to be deployed on inexpensive PCS, which requires high fault tolerance. For example, if one of the machines is down, it can transfer its computing tasks to another node so that the task does not fail, and this process does not require human participation, but is completely done internally by Hadoop.

  4. Suitable for offline processing of mass data above PB level

    It can realize the concurrent work of thousands of server clusters and provide data processing capability

2 and disadvantages

  1. Not good at real-time computing

    MapReduce cannot return results in milliseconds or seconds like MySQL does.

  2. Not good at streaming

    Whereas the input data for streaming computing is dynamic, the input data set for MapReduce is static and cannot change dynamically. This is because MapReduce itself is designed to be static.

  3. Not good at DAG (directed acyclic graph) calculation

    Multiple applications have dependencies, and the input of one application is the output of the other. In this case, it is not that MapReduce cannot be run, but that the output of each MapReduce job is written to disks, causing a large amount of DISK I/OS and low performance.

3. Core ideas of MapReduce

  1. Distributed algorithms often need to be divided into at least two phases.
  2. The first stage is a concurrent instance of MapTask, running completely in parallel without any connection.
  3. The ReduceTask concurrent instances in the second stage were unrelated, but their data depended on the output of all MapTask concurrent instances in the previous stage.
  4. The MapReduce programming model can contain only one Map phase and one Reduce phase. If the user’s business logic is very complex, only multiple MapReduce programs can be run in serial.

MapReduce process

A complete MapReduce program runs in distributed mode with three types of instance processes:

  1. MrAppMaster: responsible for the process scheduling and state coordination of the whole program.
  2. MapTask: Is responsible for the entire data processing process in the Map phase.
  3. ReduceTask: Responsible for the whole data processing process in the Reduce stage.

Official WordCount source code

Decompile the source code using decompile tools, found that WordCount cases have Map class, Reduce class and driver class. And the data type is a serialized type encapsulated by Hadoop itself.

Common data serialization types

Java type* * * * Hadoop Writable type* * * *
Boolean BooleanWritable
Byte ByteWritable
Int IntWritable
Float FloatWritable
Long LongWritable
Double DoubleWritable
String Text
Map MapWritable
Array ArrayWritable
Null NullWritable

MapReduce programming specification

The program compiled by the user is divided into three parts: Mapper, Reducer and Driver.

1. Mapper stage

2. Reduce phase

3. Driver stage

Eight, WordCount case practice

1. Local testing

  1. demand

    Count the total number of occurrences of each word in the output in a given text file

    The input data

    Expected output data

    moe 2
    zoe 1
    hadoop 1
    xue 1
    Copy the code
  2. Demand analysis

    According to MapReduce programming specification, compile Mapper, Reducer and Driver respectively.

  3. Environment to prepare

    • Create maven project, MapReduceDemo

    • Add the following dependencies to the POM.xml file

      <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.1.3</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId> The < version > 1.7.30 < / version > < / dependency > < / dependencies >Copy the code
    • In the SRC /main/resources directory of your project, create a new file named “log4j.properties” and fill it in.

      log4j.rootLogger=INFO, stdout
      log4j.appender.stdout=org.apache.log4j.ConsoleAppender
      log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
      log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
      log4j.appender.logfile=org.apache.log4j.FileAppender
      log4j.appender.logfile.File=target/spring.log
      log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
      log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
      Copy the code
    • Create a package name: com. MOE. Graphs. Wordcount

  4. Write a program

    • Write the Mapper class

      public class WordCountMapper extends Mapper<LongWritable.Text.Text.IntWritable> {
      
          private Text outK = new Text();
          private IntWritable outV = new IntWritable(1);
      
          @Override
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              // 1 gets a row
              String line = value.toString();
              / / 2
              String[] words = line.split("");
              // 3 Write the loop
              for (String word : words) {
                  / / encapsulation outk
                  outK.set(word);
                  / / writecontext.write(outK, outV); }}}Copy the code
    • Write the Reducer class

      public class WordCountReducer extends Reducer<Text.IntWritable.Text.IntWritable> {
      
         private IntWritable outV = new IntWritable();
      
         @Override
         protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
             int sum = 0;
             / / accumulation
             for (IntWritable value : values) {
                 sum += value.get();
             }
             outV.set(sum);
             / / writecontext.write(key, outV); }}Copy the code
    • Write Driver Driver classes

      public class WordCountDriver {
      
          public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
              // 1 Obtain the job
              Configuration conf = new Configuration();
              Job job = Job.getInstance(conf);
              // 2 Set the jar package path
              job.setJarByClass(WordCountDriver.class);
              // 3 Associate mapper and reducer
              job.setMapperClass(WordCountMapper.class);
              job.setReducerClass(WordCountReducer.class);
              // 4 Set the KV type for the map output
              job.setMapOutputKeyClass(Text.class);
              job.setMapOutputValueClass(IntWritable.class);
              // 5 Sets the final output kV type
              job.setOutputKeyClass(Text.class);
              job.setOutputValueClass(IntWritable.class);
              // 6 Set the input path and output path
              FileInputFormat.setInputPaths(job, new Path("D:\input\inputword"));
              FileOutputFormat.setOutputPath(job, new Path("D:\hadoop\output888"));
              // 7 Submit the job
              boolean result = job.waitForCompletion(true);
              System.exit(result ? 0 : 1); }}Copy the code
  5. Local test

    The HADOOP_HOME variable and Windows run dependencies need to be configured first

    Run the program on IDEA/Eclipse

2. Commit to cluster test

Testing on the cluster

  • Using Maven to jar packages, you need to add package plugin dependencies

    <build> <plugins> <plugin> <artifactId> Maven-compiler-plugin </artifactId> <version>3.6.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>Copy the code
  • Jar the program

  • Change the name of the jar package without dependencies to wc.jar and copy the JAR package to /opt/module/hadoop-3.1.3 in the Hadoop cluster.

  • Starting a Hadoop Cluster

    [moe@hadoop102 ~]$ start-dfs.sh
    [moe@hadoop103 ~]$ start-yarn.sh
    Copy the code
  • Execute the WordCount program

    [moe@hadoop102 ~]$ hadoop jar wc.jar com.moe.mapreduce.wordcount.WordCountDriver /input /output
    Copy the code