Small knowledge, big challenge! This article is participating in the creation activity of “Essential Tips for Programmers”.

1. Word frequency statistics task requirements

The local editortxtfileFill in the wordsDocument it

1.1 Programming method of MapReduce

Import packages

  • All jars in MapReduce

  • All jars under common

  • All jars in common/lib

  • Yarn All packages

  • All packages in HDFS

inhdfsTo create ainput.outputFolder.Check whether to create aUpload the newly createdwordfile1.txtandwordfile2.txttohdfsFile!

Check whether the upload is successful!

1.1.1 Writing Map Processing Logic

In the Map phase, the text data in wordfile1. TXT and wordfile2.txt are read in and submitted to the Map function for processing in the form of <key, value>, where key is the address offset of the currently read line and value is the content of the currently read line. After the <key, value> is submitted to the Map function, the customized Map processing logic can be run to process the value and output it in the form of a specific key-value pair. The output is provided as an intermediate result to the Reduce phase as input data.

    public static class TokenizerMapper extends Mapper<Object.Text.Text.IntWritable> {
        private static final IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public TokenizerMapper(a) {}public void map(Object key, Text value, Mapper
       
        .Context context)
       ,> throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                this.word.set(itr.nextToken());
                context.write(this.word, one); }}}Copy the code

1.1.2 Writing Reduce Processing Logic

The intermediate results obtained in the Map phase are distributed to Reduce tasks after the Shuffle phase (partitioning, sorting, and merging). For this phase, the input is in the form of <key, value-list>, for example, < ‘Hadoop’, <1,1>>. The Reduce function is the sum of the input value-list to obtain the statistical result of word frequency.

    public static class IntSumReducer extends Reducer<Text.IntWritable.Text.IntWritable> {
        private IntWritable result = new IntWritable();

        public IntSumReducer(a) {}public void reduce(Text key, Iterable
       
         values, Reducer
        
         .Context context)
        ,>
        throws
                IOException, InterruptedException {
            int sum = 0;
            IntWritable val;
            for (Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
                val = (IntWritable) i$.next();
            }
            this.result.set(sum);
            context.write(key, this.result); }}Copy the code

1.1.3 Writing the main method

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();// Load hadoop configuration
        conf.set("fs.defaultFS"."hdfs://localhost:9000");
        String[] otherArgs = new String[]{"/input/wordfile1.txt"."/input/wordfile2.txt"."/output/output"};
        if (otherArgs.length < 2) {
            System.err.println("Usage: wordcount 
      
        [
       
        ...]  
        
         "
        
       
      );
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(WordCount.TokenizerMapper.class);
        job.setCombinerClass(WordCount.IntSumReducer.class);
        job.setReducerClass(WordCount.IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        for (int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
        System.exit(job.waitForCompletion(true)?0 : 1);
    }
Copy the code

2 complete word frequency statistics procedures

3. Build the wrapper

3.1 Use the command line to compile packaged word frequency statistics program

3.2 Use IDEA to compile packaged word frequency statistics program

4. Run the program

If you want to run wordcount. jar again, delete the output directory in HDFS first. Otherwise, an error message will be displayed.

5. Programming problem

5.1 According to the attached data file flow_data.dat, program the following requirements:

Hint: Use the mobile phone number as the key value, upstream traffic, downstream traffic, upstream total traffic, and downstream total traffic, and then use the key and value as the output of the map phase. Input in reduce phase.)

  • Define a structure
public static class FlowBean implements Writable {
        private long upflow;
        private long downflow;
        private long sumflow;

        public FlowBean(a) {}public long getUpflow(a) {
            return upflow;
        }

        public void setUpflow(long upflow) {
            this.upflow = upflow;
        }

        public long getDownflow(a) {
            return downflow;
        }

        public void setDownflow(long downflow) {
            this.downflow = downflow;
        }

        public long getSumflow(a) {
            return sumflow;
        }

        public void setSumflow(long sumflow) {
            this.sumflow = sumflow;
        }


        public FlowBean(long upflow, long downflow) {
            this.upflow = upflow;
            this.downflow = downflow;
            this.sumflow = upflow + downflow;
        }

        public void write(DataOutput output) throws IOException {
            output.writeLong(this.upflow);
            output.writeLong(this.downflow);
            output.writeLong(this.sumflow);
        }

        public void readFields(DataInput Input) throws IOException {
            this.upflow = Input.readLong();
            this.downflow = Input.readLong();
            this.sumflow = Input.readLong();
        }

        @Override
        public String toString(a) {
            return this.upflow + "\t" + this.downflow + "\t" + this.sumflow; }}Copy the code
  • The main function
import java.io.IOException;
import java.util.Objects;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;

public class PhoneCount {
    public PhoneCount(a) {}public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();// Load hadoop configuration
        conf.set("fs.defaultFS"."hdfs://localhost:9000");
        String[] otherArgs = new String[]{"/phone/input/flow_data.dat"."/phone/output4"};
        Job job = Job.getInstance(conf);
        job.setJarByClass(PhoneCount.class);
        job.setMapperClass(MapWritable.class);
        job.setReducerClass(ReduceWritable.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        for (int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
        job.waitForCompletion(true);
    }
Copy the code
  • The map operation
    public static class MapWritable extends Mapper<LongWritable.Text.Text.FlowBean> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            System.out.println("line");
            System.out.println(line);
            String[] fields = line.split("\t");
            if (Objects.equals(fields[fields.length - 3]."Sum up")){

            } else {
                long upflow = Long.parseLong(fields[fields.length - 3]);
                long downflow = Long.parseLong(fields[fields.length - 2]);
                context.write(new Text(fields[1]), newFlowBean(upflow, downflow)); }}}Copy the code
  • Reduce the operating
    public static class ReduceWritable extends Reducer<Text.FlowBean.Text.FlowBean> {
        @Override
        protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
            // Define two counters to calculate the upload traffic and download traffic of each user
            long sumupflow = 0;
            long sumdownflow = 0;
            // The sum of the accumulated numbers
            for (FlowBean f: values) {
                sumupflow+=f.getUpflow();
                sumdownflow+=f.getDownflow();
            }
            / / output
            context.write(key,newFlowBean(sumupflow,sumdownflow)); }}Copy the code

5.2 Additional questions (optional)

MapReduce implements the shortest path algorithm. The optimal path algorithm is a path in a path graph where all vertices (except the starting point and end point) on the path are different and all edges are different. In highway transportation, it can provide the shortest path between the starting point and the end point and save transportation cost. It can greatly improve the efficiency of transportation.Given the following path diagram

Calculate the shortest path size between A and C programmatically

The last

Xiao Sheng Fan Yi, looking forward to your attention.