This is the seventh day of my participation in the August More text Challenge. For details, see:August is more challenging

1. Introduction of graphs

1.1 the origin of

In functional languages, map means to compute each element of a List, and reduce means to iterate over each element of a List.

Map and Reduce provide a framework for calculating the functions that are passed in.

  • In MapReduce, Map processes raw data, and each piece of data is unrelated to each other.
  • In the Reduce phase, data is organized by a key followed by a number of values. These values are correlated, at least under a key, thus conforming to the basic idea of map and Reduce in functional languages.
  • The concepts of “map” and “Reduce” and their main ideas are borrowed from functional programming languages, as well as features borrowed from vector programming languages. It makes it easier for programmers to run their programs on distributed systems without distributed parallel programming.

1.2 Introduction to the Model

  1. MapReduceA complex parallel computing process running on a large cluster is highly abstracted into two functions:MapandReduce
  2. Easy programming, do not need to master the details of distributed parallel programming, also can easily run their own program on the distributed system, to complete the calculation of massive data
  3. MapReduceUsing a “divide and conquer” strategy, a large data set stored in a distributed file system is split into many separate shards that can be processed by multiple Map tasks in parallel
  4. MapReduceOne idea is to “compute to data” rather than “data to computing,” because moving data requires a lot of overhead over the network
  5. MapReduceThe framework adoptsMaster/SlaveArchitecture, including oneMasterAnd a number ofSlave.MasterRunning on theJobTracker(ResourceManager on YARN),SlaveRunning on theTaskTracker(Nodemanager on YARN)
  6. The Hadoop framework is implemented in Java, but MapReduce applications need not be written in Java

1.3 MRv1 architecture

The MapReduce architecture consists of four parts: Client, JobTracker, TaskTracker, and Task

Node description:

  • Client

The MapReduce program written by the user is submitted to JobTracker through the Client. The user can view the job running status through some interfaces provided by the Client.

  • JobTracker

JobTracker monitors resources and schedules jobs; JobTracker monitors the health status of all taskTrackers and jobs. If a task fails, the JobTracker transfers the task to another node. The JobTracker tracks the progress of tasks, resource usage, and other information, and sends this information to the TaskScheduler, which selects the appropriate tasks to use the resources when they become idle.

  • TaskTracker

The TaskTracker periodically reports the resource usage and task running progress of the node to The JobTracker through the heartbeat. At the same time, the TaskTracker receives commands from the JobTracker and performs corresponding operations, such as starting a new task and killing a task. TaskTracker allocates the resources (such as CPU and memory) on a node by using slots. A Task has to get a slot before it has a chance to run, and the Hadoop scheduler’s job is to assign free slots on each TaskTracker to tasks. There are two types of slots, Map slot and Reduce slot, which are used by Map tasks and Reduce tasks respectively.

  • Task

Tasks are classified into Map tasks and Reduce tasks. Both tasks are started by the TaskTracker.

Structural defects:

  • Single point of failure exists
  • JobTracker “does all the work” and the task is too heavy (memory overhead is high when the task is too long, the upper limit is 4000 nodes)
  • Memory overflow is easy to occur (allocating resources only considers the number of MapReduce jobs, but does not consider CPU and memory)
  • Unreasonable resource allocation (mandatory allocation into slots, including Map slots and Reduce slots)

1.4 YARN

1.4.1 YARN Architecture

Architecture thoughtarchitectureResourceManager• Processing client requests • Starting/monitoring ApplicationMaster • Monitoring NodeManager • Resource allocation and schedulingNodeManager• Manage resources on a single node • Process commands from ResourceManger • Process commands from ApplicationMasterApplicationMaster• Request resources for applications and assign them to internal tasks • Task scheduling, monitoring, and fault tolerance

1.4.2 YARN workflow

Step 1: The user writes a client application program and submits the application program to YARNApplicationMasterProgram, startApplicationMasterStep 2:YARNIn theResourceManagerResponsible for receiving and processing requests from clients, assigning a container to the application, and starting one in that containerApplicationMasterStep 3:ApplicationMasterWhen it is created, it will first go toResourceManagerRegistration Step 4:ApplicationMasterUse the polling mode toResourceManagerApply for resources Step 5:ResourceManagerThe application is made in the form of containerApplicationMasterAllocating resources Step 6: Start tasks (running environment, scripts) in the container Step 7: Task directionApplicationMasterReport your status and progress Step 8: Once the application is running,ApplicationMastertoResourceManagerLog out of the application manager and close yourself

2. MapReduce workflow

There is no communication between different Map tasks; there is no exchange of information between different Reduce tasks; a user cannot explicitly send a message from one machine to another; all data exchange takes place through the MapReduce framework itself

example

3. Java Api essentials

  • Writable

Hadoop custom serialization interface. When passing objects or persisting objects between processes, objects need to be serialized into byte streams, and when converting byte streams received or read from disk into objects, they need to be deserialized. Map and Reduce keys and values are in the Writeable type. The key must also support the WritableComparable interface. The Java writable type is encapsulated as follows:

Java primitive Writable implementation
boolean BooleanWritable
byte ByteWritable
int ShortWritable
float FloatWritable
long LongWritable
double DoubleWritable
enum EnumWritable
Map MapWritable

(2) InputFormat Describes the format of input data. Provides two functions:

Our getSplits() will split the input data into splits based on a strategy to determine the number of Map tasks and their split values. CreateRecordReader () resolves a split into key-value pairs. FileInputFormat is all files as a data source InputFormat implementation base class, small files will not be fragmented, record read call subclass TextInputFormat implementation;

  • TextInputFormat Is the default processing class, processing plain text files, each line in the file as a record, line start offset is key, each line of text is value;
  • CombineFileInputFormatFor small file design, can merge small files;
  • KeyValueTextInputFormat Suitable for processing one row and two columns togethertabData as delimiters;
  • NLineInputFormat Control of eachsplit The number of lines in.

(3) the OutputFormat

Describes the format of the output data. Hadoop comes with various implementations of OutputFormat.

  • TextOutputFormat The default output format, with key and value separated by TAB;
  • SequenceFileOutputFormat, output key and value in SequenceFile format;
  • SequenceFileAsOutputFormat, outputs the key and value in the original binary format;
  • MapFileOutputFormat, write key and value to MapFile;
  • MultipleOutputFormatBy default, the Reducer generates one output. This format enables multiple outputs from a Reducer.

(4) Mapper/Reducer

It encapsulates the processing logic of the application program and is mainly implemented by the Map and Reduce methods.

(5) the Partitioner

Partition according to the map output key, getPartition() method returns the partition value, the default hash function

The number. The number of partitions is the same as the number of Reduce jobs for a job. The HashPartitioner is the default Partioner.

4. Experimental process

1, counting statistics class application imitate WordCount example, write “TelPubXxx” class to realize the statistics of the telephone information to dial the public service number. A text input file is given as follows, with a phone number in the first column and a public service number in the second, separated by a space. 13718855152 11216810117315 110 39451849 112 13718855153 110 13718855154 112 18610117315 114 18610117315 114 MapReduce Program execution after the output as follows, with “|” connection between the telephone number: 110 13718855153 16810117315 112 13718855154 | | 39451849 | | 18610117315 13718855152 114 18610117315The successful running

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TelPubZqc {
    public static class TelMap extends Mapper<Object.Text.Text.Text> {
        private Text pub = new Text();
        private Text tel = new Text();
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            //Map (Key Value)
            String[] s=value.toString().split("");
            tel.set(s[0]);
            pub.set(s[1]); context.write(pub,tel); }}public static class TelReducer extends Reducer<Text.Text.Text.Text> {
        private Text result = new Text();
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            StringBuilder s= new StringBuilder();
            for (Text val : values) {
                if(s.toString().equals("")){
                    s.append(val.toString());
                }
                else s.append("|").append(val.toString());
            }
            result.set(String.valueOf(s));
            context.write(key, result);// Output the result}}public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();// Load the Hadoop configuration
        conf.set("fs.defaultFS"."hdfs://localhost:9000");
        String[] otherArgs = new String[]{"input/input.txt"."output/outputTel"};
        if (otherArgs.length < 2) {
            System.err.println("Usage: PubTel 
      
        [
       
        ...]  
        
         "
        );
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "word count");// Set the environment parameters
        job.setJarByClass(TelPubZqc.class);// Set the program main class
        job.setMapperClass(TelMap.class);// Set the user implemented Mapper class
        job.setCombinerClass(TelReducer.class);
        job.setReducerClass(TelReducer.class);// Set the Reducer class implemented by the user
        job.setOutputKeyClass(Text.class);// Set the output key type
        job.setOutputValueClass(Text.class); // Set the type of output value
        for (int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));// Add the input file path
        }
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));// Set the output file path
        System.exit(job.waitForCompletion(true)?0 : 1); // Submit the job and wait for it to finish}}Copy the code

2. The Join application of two tables follows the example of single table association, and writes the “RelationXxx” class to realize multi-table association. Chinese text files are converted to UTF-8 encoding format. Otherwise, garbled characters will be displayed. Enter the score. TXT:

studentid classid score
s003001 fd3003 84
s003001 fd3004 90
s003002 fd2001 71
s002001 fd1001 66
s001001 fd1001 98
s001001 fd1002 60
Enter a major. TXT:
classid classname deptname
fd1001 Data mining Department of mathematics
fd2001 Electronic engineering Department of electronics
fd2002 Electronic technology Department of electronics
fd3001 Big data Computer science department
fd3002 Network engineering Computer science department
fd3003 Java application Computer science department
fd3004 Web front end Computer science department
Output results:
classid classname deptname
fd1001 Data mining Department of mathematics
fd1001 Data mining Department of mathematics
fd2001 Electronic engineering Department of electronics
fd3003 Java application Computer science department
fd3004 Web front end Computer science department

Transfer the required items to HDFS.

No error. View the results

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;

public class RelationZqc {
    public static int time = 0;
    public static class RelationMap extends Mapper<Object.Text.Text.Text> {
        private Text classID = new Text();
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String filename=((FileSplit)context.getInputSplit()).getPath().getName();
            String[] s = value.toString().split("");
            if(filename.equals("score.txt")){
                classID.set(s[1]);
                String val="1," + s[0] + "," + s[2];
                context.write(classID,new Text(val));
            }
            else if (filename.equals("major.txt")) {if(! s[0].equals("classid")){
                    classID.set(s[0]);
                    String val = "2," + s[1] + "," + s[2];
                    context.write(classID,newText(val)); }}}}public static class RelationReduce extends Reducer<Text.Text.Text.Text> {
        private Text result = new Text();
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            String[][] studentTable=new String[10] [2];
            String[] data;
            String classID = "nil";
            if(time == 0){
                context.write(new Text("classid"), new Text("classname deptname studentid score"));
                time++;
            }
            int cnt = 0;
            for (Text val : values) {
                data = val.toString().split(",");
                if(data[0].equals("1")){
                    studentTable[cnt][0] = data[1];
                    studentTable[cnt][1] = data[2];
                    cnt = cnt + 1;
                }
                else if(data.length == 3 && data[0].equals("2")){
                    classID = data[1] + "" + data[2]; }}for(int i = 0; i < cnt; i++){
                if(classID.equals("nil")) continue;
                String s=classID+""+studentTable[i][0] +""+studentTable[i][1]; result.set(s); context.write(key, result); }}}public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();// Load the Hadoop configuration
        conf.set("fs.defaultFS"."hdfs://localhost:9000");
        String[] otherArgs = new String[]{"input/score.txt"."input/major.txt"."output/outputRelationZqc"};
// String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
        if (otherArgs.length < 2) {
            System.err.println("Usage: Relation 
      
         [
        
         ...]  
         
          "
         );
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "RelationZqc");// Set the environment parameters
        job.setJarByClass(RelationZqc.class);// Set the program main class
        job.setMapperClass(RelationMap.class);// Set the user implemented Mapper class
        job.setReducerClass(RelationReduce.class);// Set the Reducer class implemented by the user
        job.setOutputKeyClass(Text.class);// Set the output key type
        job.setOutputValueClass(Text.class); // Set the type of output value
        for (int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));// Add the input file path
        }
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));// Set the output file path
        System.exit(job.waitForCompletion(true)?0 : 1); // Submit the job and wait for it to finish}}Copy the code

3. Simple sorting application Write the MapReduce program “SortXxx” and input the contents of files sort1. TXT, sort2. TXT, and sort3. TXT. The program randomly generates several pieces of data and stores them in the HDFS. The data can be dates or numbers; The output consists of two columns of data, the first column is the original data from the input file, and the second column is the rank of that data.The successful running

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;


public class SortZqc {
    public static class SortMap extends Mapper<Object.Text.IntWritable.IntWritable>{
        private static IntWritable data = new IntWritable();
        // Implement the map function
        public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
            String line=value.toString();
            data.set(Integer.parseInt(line));
            context.write(data, new IntWritable(1)); }}public static class SortReduce extends Reducer<IntWritable.IntWritable.IntWritable.IntWritable>{
        IntWritable n = new IntWritable(1);  // Use n to represent order
        public void reduce(IntWritable key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{
            for(IntWritable val:values){
                context.write(key,n);
                n = new IntWritable(n.get()+1); }}}public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();// Load the Hadoop configuration
        conf.set("fs.defaultFS"."hdfs://localhost:9000");
        String[] otherArgs = new String[]{"input/sort1.txt"."input/sort2.txt"."input/sort3.txt"."output/outputSortZqc"};
        if (otherArgs.length < 2) {
            System.err.println("Usage: data sort 
      
        [
       
        ...]  
        
         "
        );
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "data sort");// Set the environment parameters
        job.setJarByClass(SortZqc.class);// Set the program main class
        job.setMapperClass(SortMap.class);// Set the user implemented Mapper class
        job.setCombinerClass(SortReduce.class);
        job.setReducerClass(SortReduce.class);// Set the Reducer class implemented by the user
        job.setOutputKeyClass(IntWritable.class);// Set the output key type
        job.setOutputValueClass(IntWritable.class); // Set the type of output value
        for (int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));// Add the input file path
        }
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));// Set the output file path
        System.exit(job.waitForCompletion(true)?0 : 1); // Submit the job and wait for it to finish}}Copy the code

The last

Xiao Sheng Fan Yi, looking forward to your attention.