The knowledge points involved in Hadoop are shown in the following figure, which will be explained one by one in this paper:

This document is compiled by referring to Hadoop’s official website and many other materials. For neat typesetting and comfortable reading, blurred images and black and white images are redrawn into high-definition color images.

At present, hadoop2. x is the most popular enterprise application, so this article is based on Hadoop2.x, for hadoop3. x new content will be explained!

Second, the graphs

1. The graphs is introduced

The Idea of MapReduce is everywhere in life. More or less everyone has been exposed to this idea. The core concept of MapReduce is “divide and conquer”, which is suitable for large-scale data processing scenarios with a large number of complex tasks. Even Google, which published a paper implementing distributed computing, has implemented the idea, not invented it itself.

  • Map is responsible for “partitioning,” or breaking complex tasks into several “simple tasks” for parallel processing. Splitting is possible only if these small tasks can be computed in parallel and have little dependence on each other.
  • Reduce combines results in the Map phase globally.
  • MapReduce runs in the YARN cluster
    1. ResourceManager
    2. NodeManager

Together, these two phases are the embodiment of the MapReduce idea.

Here’s another visual way to explain MapReduce:

We need to count all the books in the library. You count shelf one, I’ll count shelf two. This is Map. The more of us, the faster we count.

Now let’s get together and add up everyone’s statistics. This is Reduce.

1.1 Design concept of MapReduce

MapReduce is a programming framework for distributed computing programs. Its core function is to integrate user-written service logic codes and default components into a complete distributed computing program, which concurrently runs on Hadoop clusters.

Since it is a framework for calculation, the expression is an input. MapReduce operates this input and obtains an output through its own defined calculation model.

MapReduce is a programming model that simplifies parallel computing and lowers the barrier to entry for developing parallel applications.

The idea of Hadoop MapReduce is embodied in the following three aspects:

  1. How to tackle Big data processing: Divide and conquer

For big data without computational dependence, the most natural way to achieve parallelism is to adopt a divide-and-conquer strategy. The first important problem of parallel computing is how to divide computing tasks or data so that subtasks or data blocks are computed simultaneously. Non-detachable tasks or interdependent data cannot be computed in parallel!

  1. Build abstract models: Map and Reduce

MapReduce draws on the ideas of functional languages to provide a high-level parallel programming abstraction model using Map and Reduce functions.

Map: some sort of repetitive processing of a set of data elements;

Reduce: Some further result collation of the Map’s intermediate results.

MapReduce defines the following abstract programming interfaces, Map and Reduce, which are implemented by users:

map: (k1; v1) → [(k2; v2)]

reduce: (k2; [v2]) → [(k3; v3)]

Map and Reduce provide programmers with a clear abstract description of their operational interfaces. Through the above two programming interfaces, you can see that MapReduce processes key-value pairs of type <key,value>.

  1. MapReduce framework

A complete MapReduce program runs in distributed mode with three types of instance processes:

  • MR AppMaster: responsible for process scheduling and state coordination of the whole program;
  • MapTask: Responsible for the whole data processing process in map stage;
  • ReduceTask: Responsible for the whole data processing process in the Reduce stage.

2. MapReduce programming specifications

MapReduce development consists of eight steps, including two steps in the Map phase, four steps in the Shuffle phase, and two steps in the Reduce phase

  1. The Map phase consists of two steps:
  • 1.1 Set the InputFormat class, split the data into key-value **(K1 and V1)** pairs, and enter them in step 2
  • 1.2 Customizing Map logic Converts the result of the first step to another key-value pair (K2 and V2) and outputs the result
  1. The Shuffle phase consists of four steps:
  • 2.1 Partition the output key-value pair
  • 2.2 Sort data in different Partitions by the same Key
  • 2.3 (Optional) Configure grouped data to reduce network copy
  • 2.4 Group data and put the values of the same Key into a set
  1. The Reduce phase consists of two steps:
  • 3.1 Sort and merge the results of multiple Map tasks, write Reduce functions to implement its own logic, process the input key-value, and convert it into new key-value (K3 and V3) output
  • 3.2 Setting OutputFormat To process and save key-value data output by Reduce

3. Mapper and Reducer abstract classes are introduced

In order to develop our MapReduce program, there are eight steps, each of which is a class class. We assemble our program into a task submission using the Job object. In order to simplify the development of our MapReduce program, each step of the class class, there is an established parent class, let us directly inherit, so we can greatly simplify the development of our MapReduce program difficulty, but also allow us to quickly achieve functional development.

In MapReduce programming, the two most important steps are our Mapper class and Reducer class

  1. A basic introduction to Mapper abstract classes

Hadoop2. x Mapper class is an abstract class, we just need to overwrite a Java class, inherit from Mapper class, and then rewrite some of the methods inside, we can achieve our specific function, next we will introduce the Mapper class among the more important four methods

  1. Setup method: we Mapper class in the initialization method, we some object initialization work can be put in this method to achieve

  2. Map method: For every line of data read, the map method will be called once. This method is also our most important method. We can use this method to realize the processing of each piece of data

  3. Cleanup method: The cleanup method is called as soon as the mapTask is complete. This method is used to do some cleanup, such as disconnecting connections, closing resources, etc

  4. Run method: If we need finer control over the execution of our entire MapTask, we can override this method to achieve more precise operational control over all of our MapTasks

  1. Reducer Abstract Class Introduction

Similarly, in our hadoop2.x, the Reducer class is also an abstract class. The abstract class allows us to inherit this abstract class and overwrite the methods in the abstract class again to achieve our logic custom control. Next we also introduce the four abstract methods in the Reducer abstract class

  1. Setup method: called immediately after our ReduceTask initialization, some of our object initialization work can be implemented in this class

  2. Reduce method: All data sent from MapTask will call reduce method, which is also the most important method in Reduce. We can use this method to realize data processing

  3. Cleanup method: After we complete the whole ReduceTask execution, we will immediately call the cleanup method, which is mainly to do some cleanup work in our Reduce stage, such as connection disconnection, resource closure, etc

  4. Run method: If we need finer control over the execution of our whole ReduceTask, then we can override this method to achieve more precise operational control over all our ReduceTask

4. Sample preparation of WordCount

Requirement: Count the total number of occurrences of each word output in a given set of text files

Node01 Server Run the following command to prepare data in the following format:

CD /export/ Servers vim wordcount.txt hello hello world world hadoop hadoop hello world hello flume hadoop hive hive kafka flume storm hive oozieCopy the code

Upload the data file to HDFS

hdfs dfs -mkdir /wordcount/
hdfs dfs -put wordcount.txt /wordcount/
Copy the code
  • Define a Mapper class
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

// Mapper program: need to inherit mapper class, need to pass in four types:
/* In Hadoop, Java types are wrapped to improve transmission efficiency writable keyin: k1 Long ---- LongWritable Valin: v1 String ------ Text keyout: k2 String ------- Text valout : v2 Long -------LongWritable */

public class MapTask extends Mapper<LongWritable.Text.Text.LongWritable> {

    / * * * *@param key  : k1
     * @param value   v1
     * @paramContext Context object connects functions *@throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1. Obtain data in v1
        String val = value.toString();

        //2. Cut data
        String[] words = val.split("");

        Text text = new Text();
        LongWritable longWritable = new LongWritable(1);
        //3. Iterate through the loop and send it to reduce
        for(String word : words) { text.set(word); context.write(text,longWritable); }}}Copy the code
  • Define a Reducer class
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/** * KEYIN : k2 -----Text * VALUEIN : v2 ------LongWritable * KEYOUT : k3 ------ Text * VALUEOUT : v3 ------ LongWritable */
public class ReducerTask extends Reducer<Text.LongWritable.Text.LongWritable> {


    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

        //1. Iterate through values for each value
        long  v3 = 0;
        for (LongWritable longWritable : values) {

            v3 += longWritable.get();  / / 1
        }

        / / 2. The output
        context.write(key,newLongWritable(v3)); }}Copy the code
  • Define a main class that describes the job and submits the job
import com.sun.org.apache.bcel.internal.generic.NEW;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

// Task execution entry: group eight steps together
public class JobMain extends Configured implements Tool {
    // Write the eight assembly steps in the run method
    @Override
    public int run(String[] args) throws Exception {

        Job job = Job.getInstance(super.getConf(), "JobMain");
        // If commit to cluster operation. A step needs to be added: Specify the entry class
        job.setJarByClass(JobMain.class);


        //1. Encapsulate the first step: read data
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("hdfs://node01:8020/wordcount.txt"));

        //2. Encapsulation Step 2: Customize the map program
        job.setMapperClass(MapTask.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        // step 3. Step 4. Step 5

        // step 7: Customize the Reduce program
        job.setReducerClass(ReducerTask.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        //5) Step 8: Output path is a directory, and the directory must not exist
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("hdfs://node01:8020/output"));

        //6) Submit tasks:
        boolean flag = job.waitForCompletion(true); // Success true not success false

        return flag ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        JobMain jobMain = new JobMain();
        int i = ToolRunner.run(configuration, jobMain, args); // Return value exit code

        System.exit(i); 0 indicates normal. Other values indicate exception 1}}Copy the code

Remind: after the code development is completed, you can put into jar package on the server to run, the actual work, are the code into jar package, develop the main method as the entrance of the program, and then put into the cluster to run above

5. Running mode of MapReduce

  • Local operating mode
  1. The MapReduce program is submitted to LocalJobRunner and runs locally as a single process
  2. The processed data and output results can be stored in the local file system or HDFS
  3. How to implement local run? Write a program that does not have a cluster configuration file that is essentially in the conf of the programmapreduce.framework.name=localAs well asyarn.resourcemanager.hostname=localparameter
  4. Local mode is very convenient for business logic debugging, as long as the interrupt point in idea

[Local mode running code setting]

configuration.set("mapreduce.framework.name","local"); configuration.set("yarn.resourcemanager.hostname","local"); ----------- the above two do not need to be modified, if you want to test in the local directory, Can be modified HDFS Path -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- TextInputFormat. AddInputPath (job, new Path (" file:///D:\\wordcount\\input ")); TextOutputFormat.setOutputPath(job,new Path("file:///D:\\wordcount\\output"));Copy the code
  • Cluster operating mode
  1. Submit the MapReduce program to the YARN cluster and distribute it to many nodes for concurrent execution

  2. The processed data and output results should reside in the HDFS file system

  3. Implementation steps for committing a cluster:

    The program into jars, and then any one node in the cluster with the hadoop command to start the yarn JAR hadoop_hdfs_operate – 1.0 – the SNAPSHOT. JAR cn. Itcast. HDFS. Not. JobMain

6. Detailed description of MapReduce operation mechanism

6.1 Working Mechanism of MapTask

The entire Map process is roughly as shown in the figure above.

Brief overview: InputFile is logically divided into multiple split files through split. The content is read in line by Record for map (implemented by users themselves) for processing. After the data is processed by Map, it is handed to OutputCollector. Partition the result key (hash partition by default) and write it to the buffer. Each Map task has a memory buffer that stores the map output. When the buffer is nearly full, the data in the buffer needs to be saved to disk as a temporary file. After the entire Map task is complete, merge all temporary files generated by the Map task to generate the final output file and wait for the Reduce Task to pull data

The detailed steps

  1. Read data componentInputFormat(Default TextInputFormat) will passgetSplitsMethods Logical section planning of files in input directory was obtainedblockHow many are thereblockThat’s how many of them you startMapTask
  2. Shards the input file intoblockLater, byRecordReaderObject (LineRecordReader by default)readIn order to\nAs a delimiter, reads a line of data and returns< key, value >Key indicates the offset Value of the first character of each line, and Value indicates the text content of the line
  3. readblockreturn<key,value>.Into the user’s own inherited Mapper classRecordReader reads a line and is called here once
  4. After the Mapper logic ends, each result of the Mapper is passedcontext.writeCollect data. In collect, it is partitioned first and is used by defaultHashPartitioner

MapReduce provides the Partitioner interface, which determines which Reduce task should process the current pair of output data according to the number of keys or values and Reducer. By default, the Reducer number is followed by the Key Hash. The default mode is just to average the Reducer’s processing capacity. If the user has requirements for the Partitioner, it can be customized and set to the Job

  1. Next, the data is written to memory, an area of memory called the ring buffer, which collects Mapper results in batches and reduces the impact of disk IO. Our Key/Value pair and Partition results are written to the buffer. Of course, both the Key and Value values are serialized into byte arrays before writing

The ring buffer is actually an array, which stores the serialized data of Key and Value and the metadata information of Key and Value, including Partition, the start position of Key, the start position of Value and the length of Value. The ring structure is an abstract concept.

The size of the buffer is limited, the default is 100MB. When the Mapper output results are very large, it may overflow the memory, so it is necessary to temporarily write the data in the buffer to disk under certain conditions, and then reuse the buffer. This process of writing data from memory to disk is called Spill. This overwrite is done by a separate thread and does not affect the thread writing Mapper results to the buffer. The overflow thread should not be started to prevent the Mapper from output, so the entire buffer has a spill.percent. When the buffer size * spill percent = 100MB * 0.8 = 80MB, the write overflow thread starts, locks the 80MB memory, and performs write overflow. Mapper output can also be written to the remaining 20MB of memory without affecting each other

  1. When the overwrite thread starts, it needs to Sort the 80MB Key. Sorting is the default behavior of the MapReduce model, and the sorting here is also done on serialized bytes

If Combiner is configured for Job, it is time to use Combiner. Combine the values of Key/Value pairs with the same Key to reduce the amount of data overwritten to disks. Combiner optimizes the intermediate results of MapReduce, so it is used multiple times throughout the model

Which scenarios can use Combiner? From this analysis, the output of Combiner is the input from Reducer and Combiner must not change the final calculation result. The Combiner should be used only in scenarios where the input Key/Value and output Key/Value types of Reduce are exactly the same and the final result is not affected. Summation, maximum, etc. The Combiner must be carefully used. If it is well used, it will help the Job execution efficiency. If it is well used, it will affect the final result of Reducer

  1. Merge overwrite files. Each overwrite generates a temporary file on the disk (check whether Combiner exists before writing). If the output result of Mapper is very large and multiple such overwrites occur, multiple temporary files will exist on the disk. After the data processing is complete, Merge temporary files in the disk. Because there is only one final file, Merge temporary files in the disk, and provide an index file for this file to record the offset of each Reduce data

Some basic configuration of mapTask

configuration The default value explain
mapreduce.task.io.sort.mb 100 Sets the size of the ring buffer
mapreduce.map.sort.spill.percent 0.8 Set the write overflow ratio
mapreduce.cluster.local.dir ${hadoop.tmp.dir}/mapred/local Overwrite data directory
mapreduce.task.io.sort.factor 10 Sets how many overflow files to merge at one time

6.2 ReduceTask Working mechanism

Reduce is divided into three phases: Copy, sort, and Reduce. The first two phases are the most important. The copy phase contains an eventFetcher to fetch the completed map list, and the Fetcher thread to copy the data. During this process, two merge threads, inMemoryMerger and onDiskMerger, are launched. Merge data in the memory to disks and data in disks. After data copy is complete, the copy stage is complete and the sort stage is started. The sort stage mainly performs finalMerge operation. The pure sort stage is followed by the Reduce stage, and the user defined Reduce function is invoked for processing

The detailed steps

  1. Copy phase, simply pull data. The Reduce process starts some data copy threads (Fetcher) and requests MapTask to obtain its own files through HTTP.
  2. The Merge phase. Merge is similar to the merge on the map side, but the array stores values from different copies on the map side. The data from the Copy is first put into the memory buffer, which is more flexible than the size of the map buffer. Merge comes in three forms: memory to memory; Memory to disk; Disk to disk. The first form is not enabled by default. When the amount of data in the memory reaches a certain threshold, merge the memory into disks. Similar to map, this is also a write overflow process. In this process, if you set Combiner, it will also be enabled, and many write overflow files will be generated on disk. The second merge method runs until there is no data on the Map side, and then the third disk-to-disk merge method generates the final file.
  3. Merge sort. After combining scattered data into one large data, the combined data is sorted again.
  4. The reduce method is invoked for sorted key-value pairs. The reduce method is invoked for key-value pairs with equal keys. Zero or more key-value pairs are generated each time.

6.3 the Shuffle process

How to transfer data processed in the Map phase to the Reduce phase is the most critical process in the MapReduce framework. This process is called shuffle

Shuffle (core mechanism: data partitioning, sorting, grouping, protocol, merging, etc.)

Shuffle is the core of Mapreduce and is distributed in the Map and Reduce phases of Mapreduce. Generally, the process from Map output to Reduce data as input is called shuffle.

  1. Collect stage: Output the MapTask result to the ring buffer with the default size of 100M, which stores key/value and Partition information.
  2. Spill phase: When the amount of data in the memory reaches a certain threshold, data is written to the local disk. Data is sorted before being written to the disk. If Combiner is configured, data with the same partition NUMBER and key is sorted.
  3. Merge phase: All overflow temporary files are merged at once to ensure that a MapTask ends up producing only one intermediate data file.
  4. Copy stage: ReduceTask Starts the Fetcher thread to Copy a Copy of its own data to the node that has completed MapTask, which will be stored in the memory buffer by default, and when the memory buffer reaches a certain threshold, the data will be written to disk.
  5. Merge phase: During the ReduceTask remote replication of data, two threads are started in the background to Merge data files from memory to the local area.
  6. Sort stage: sorting operation will be carried out when data is merged. As MapTask stage has carried out partial sorting of data, ReduceTask only needs to ensure the final overall effectiveness of Copy data.

The size of the buffer in Shuffle affects the execution efficiency of the MapReduce program. In principle, the larger the buffer is, the fewer I/O operations are performed on the disk, and the higher the execution speed is

The size of the buffer can pass parameter adjustment, parameters: graphs. Task. IO. Sort. The MB of 100 MB by default

7. JOIN is implemented on the Reduce end

7.1 requirements

If the amount of data is large and the data of the two tables is stored in HDFS as a file, the MapReduce program is required to perform the following SQL query operations

select  a.id,a.date,b.name,b.category_id,b.price from t_order a left join t_product b on a.pid = b.id
Copy the code
  • Goods table
id pname category_id price
P0001 Millet 5 1000 2000
P0002 The hammer T1 1000 3000
  • Order data sheet
id date pid amount
1001 20150710 P0001 2
1002 20150710 P0002 3

7.2 Implementation Procedure

By using the associated conditions as the map output key, the two tables that meet the join conditions and carry the file information from which the data comes are sent to the same Reduce task for data series in reduce

  1. Define the orderBean
import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class OrderJoinBean implements Writable {

    private String id="";  / / order id
    private String date="";  // Order time
    private String pid="";  // Product id
    private String amount="";  // Order quantity
    private String name="";   // Product name
    private String categoryId=""; // Category id of the item
    private String price="";  // The price of the goods


    public String getId(a) {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getDate(a) {
        return date;
    }

    public void setDate(String date) {
        this.date = date;
    }

    public String getPid(a) {
        return pid;
    }

    public void setPid(String pid) {
        this.pid = pid;
    }

    public String getAmount(a) {
        return amount;
    }

    public void setAmount(String amount) {
        this.amount = amount;
    }

    public String getName(a) {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getCategoryId(a) {
        return categoryId;
    }

    public void setCategoryId(String categoryId) {
        this.categoryId = categoryId;
    }

    public String getPrice(a) {
        return price;
    }

    public void setPrice(String price) {
        this.price = price;
    }

    @Override
    public String toString(a) {
        return id + "\t" + date + "\t" + pid + "\t" + amount + "\t" + name + "\t" + categoryId + "\t" + price;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(id);
        out.writeUTF(date);
        out.writeUTF(pid);
        out.writeUTF(amount);
        out.writeUTF(name);
        out.writeUTF(categoryId);
        out.writeUTF(price);
    }

    @Override
    public void readFields(DataInput in) throws IOException { id = in.readUTF(); date = in.readUTF(); pid = in.readUTF(); amount = in.readUTF(); name = in.readUTF(); categoryId = in.readUTF(); price = in.readUTF(); }}Copy the code
  1. Define the Mapper
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class MapperJoinTask extends Mapper<LongWritable.Text.Text.OrderJoinBean> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // Get the file name from the file slice
        FileSplit fileSplit = (FileSplit) context.getInputSplit();

        String fileName = fileSplit.getPath().getName();

        //1. Get the data for each row
        String line = value.toString();

        //2. Cutting treatment
        String[] split = line.split(",");
        OrderJoinBean orderJoinBean = new OrderJoinBean();
        if(fileName.equals("orders.txt")) {// Order data
            orderJoinBean.setId(split[0]);
            orderJoinBean.setDate(split[1]);
            orderJoinBean.setPid(split[2]);
            orderJoinBean.setAmount(split[3]);
        }else{
            // Commodity data
            orderJoinBean.setPid(split[0]);
            orderJoinBean.setName(split[1]);
            orderJoinBean.setCategoryId(split[2]);
            orderJoinBean.setPrice(split[3]);
        }


        //3. Send it to reduceTask
        context.write(newText(orderJoinBean.getPid()),orderJoinBean); }}Copy the code
  1. Define the Reducer
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class ReducerJoinTask extends Reducer<Text.OrderJoinBean.Text.OrderJoinBean> {

    @Override
    protected void reduce(Text key, Iterable<OrderJoinBean> values, Context context) throws IOException, InterruptedException {

        //1. Traversal: The same key is sent to the same Reduce. The values of the same key form a set
        OrderJoinBean orderJoinBean  = new OrderJoinBean();
        for (OrderJoinBean value : values) {
            String id = value.getId();
            if(id.equals("")) {// Commodity data
                orderJoinBean.setPid(value.getPid());
                orderJoinBean.setName(value.getName());
                orderJoinBean.setCategoryId(value.getCategoryId());
                orderJoinBean.setPrice(value.getPrice());

            }else {
                // Order dataorderJoinBean.setId(value.getId()); orderJoinBean.setDate(value.getDate()); orderJoinBean.setPid(value.getPid()); orderJoinBean.setAmount(value.getAmount()); }}//2context.write(key,orderJoinBean); }}Copy the code
  1. Define the main class
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JobReduceJoinMain extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {

        1. Obtain the job object
        Job job = Job.getInstance(super.getConf(), "jobReduceJoinMain");

        //2
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("Join file:///D:\\reduce end \ \ input"));

        job.setMapperClass(MapperJoinTask.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(OrderJoinBean.class);


        job.setReducerClass(ReducerJoinTask.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(OrderJoinBean.class);


        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("D: \ \ the reduce the join \ \ out_put"));

        boolean b = job.waitForCompletion(true);


        return b?0:1;
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        JobReduceJoinMain jobReduceJoinMain = new JobReduceJoinMain();
        inti = ToolRunner.run(conf, jobReduceJoinMain, args); System.exit(i); }}Copy the code

Disadvantages: In this mode, join operations are completed in the Reduce phase. The processing pressure of the Reduce side is too heavy, the computing load of map nodes is low, the resource utilization is low, and data skew is easily generated in the Reduce phase

8. Implement JOIN on the Map side

8.1 an overview of the

This applies to the case where there are small tables in the associated table.

With distributed cache, small tables can be distributed to all map nodes. In this way, map nodes can join data they read from large tables locally and output final results, which greatly improves the concurrency of join operations and speeds up the processing speed

8.2 Implementation Procedure

A small table is predefined in mapper class for join

Introduce the solution in a real scenario: load the database at once

  1. Define the Mapper
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

public class MapperTask extends Mapper<LongWritable.Text.Text.Text> {
    private Map<String,String> map = new HashMap<>();

    // The initialization method is initialized only once
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {

        URI[] cacheFiles = DistributedCache.getCacheFiles(context.getConfiguration());
        URI fileURI = cacheFiles[0];

        FileSystem fs = FileSystem.get(fileURI, context.getConfiguration());

        FSDataInputStream inputStream = fs.open(new Path(fileURI));

        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
        String readLine  ="";
        while((readLine = bufferedReader.readLine() ) ! =null  ) {
            // readlLine: product Line data
            String[] split = readLine.split(",");

            String pid = split[0];

            map.put(pid,split[1] +"\t"+split[2] +"\t"+split[3]); }}@Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //1. Read a row of data: orders
        String line = value.toString();

        / / 2. The cutting
        String[] split = line.split(",");

        String pid = split[2];

        //3. Get commodity information from map:
        String product = map.get(pid);

        //4. Send to reduce: output
        context.write(new Text(pid),new Text(split[0] +"\t"+split[1] +"\t"+product +"\t"+split[3])); }}Copy the code
  1. Define the main class
import com.itheima.join.reduce.JobReduceJoinMain;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.net.URI;

public class JobMapperJoinMain extends Configured implements Tool{
    @Override
    public int run(String[] args) throws Exception {
        // Set the position of the cache. It must be placed before the run method. If it is placed after the job is created, it will be invalid
        // The path of the cache file must be stored in HDFS, otherwise it is invalid
        DistributedCache.addCacheFile(new URI("hdfs://node01:8020/cache/pdts.txt"),super.getConf());

        1. Obtain the job task
        Job job = Job.getInstance(super.getConf(), "jobMapperJoinMain");

        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("E: \ \ the intellectual class work \ \ \ \ Beijing sixth day big data 30 period \ \ data \ \ data \ \ map join \ \ map_join_iput"));

        job.setMapperClass(MapperTask.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("E: \ \ the intellectual class work \ \ \ \ Beijing sixth day big data 30 period \ \ data \ \ data \ \ map join \ \ out_put_map"));

        boolean b = job.waitForCompletion(true);

        return b?0:1;
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        JobMapperJoinMain jobMapperJoinMain = new JobMapperJoinMain();
        inti = ToolRunner.run(conf, jobMapperJoinMain, args); System.exit(i); }}Copy the code

9. Social fan data analysis

9.1 Requirement Analysis

The following is the qq friend list data. Before the colon is a user, after the colon is all the friends of the user (the friend relationship in the data is one-way).

A:B,C,D,F,E,O
B:A,C,E,K
C:A,B,D,E,I 
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J
Copy the code

Find out which people have common friends with each other, and who do they have common friends with?

【 答 案 】

The first step in the map to read A line A, B, C, D, F, E, O output < B, A > < A > C, < D, A > < A > F, < E, A > < A > O, reading A line B: A, C, E, K output < A, B > < C, B > < E, B > < K, B > REDUCE <C,A><C,B><C,E><C,F><C,G>...... Output: < A, B, C > < > A - E, C < A - F, C > < C > A - G, < B, E, C > < - F, B > C... The second step map read < A, B, C > A line directly output < A, B, C > reduce data read < A, B, C > < > A - B, F < > A - B, G... Output: A-b C,F,G,.....Copy the code

9.2 Implementation Procedure

The first MapReduce code implementation

The Mapper class 】

public class Step1Mapper extends Mapper<LongWritable.Text.Text.Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
         //1: Split text data with a colon: V2 is to the left of the colon
        String[] split = value.toString().split(":");
        String userStr = split[0];

        //2: Split the string to the right of the colon with a comma. Each member is K2
        String[] split1 = split[1].split(",");
        for (String s : split1) {
            //3: write K2 and v2 into the context
            context.write(new Text(s), newText(userStr)); }}}Copy the code

The Reducer class 】

public class Step1Reducer extends Reducer<Text.Text.Text.Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        //1: iterate over the set and concatenate each element to get K3
        StringBuffer buffer = new StringBuffer();

        for (Text value : values) {
            buffer.append(value.toString()).append("-");
        }
        / / 2: K2 is V3
        //3: write K3 and V3 to the context
        context.write(newText(buffer.toString()), key); }}Copy the code

JobMain:

public class JobMain extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        //1: Obtains the Job object
        Job job = Job.getInstance(super.getConf(), "common_friends_step1_job");

        //2: Sets the job task
            // Step 1: Set the input class and input path
            job.setInputFormatClass(TextInputFormat.class);
            TextInputFormat.addInputPath(job, new Path("file:///D:\\input\\common_friends_step1_input"));

            // Step 2: Set the Mapper class and data type
            job.setMapperClass(Step1Mapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            // Three, four, five, six

            // Step 7: Set the Reducer class and data type
            job.setReducerClass(Step1Reducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            // Step 8: Set the output class and output path
            job.setOutputFormatClass(TextOutputFormat.class);
            TextOutputFormat.setOutputPath(job, new Path("file:///D:\\out\\common_friends_step1_out"));

        //3: Waits for the job task to end
        boolean bl = job.waitForCompletion(true);


        return bl ? 0: 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();

        // Start the job task
        int run = ToolRunner.run(configuration, newJobMain(), args); System.exit(run); }}Copy the code

The second MapReduce code implementation

The Mapper class 】

public class Step2Mapper extends Mapper<LongWritable.Text.Text.Text> {
    /* K1 V1 0 A-F-C-J-E- B ---------------------------------- K2 V2 A-C B A-E B A-F B C-E B */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1: split the text data, the second part of the result can be obtained V2
        String[] split = value.toString().split("\t");
        String   friendStr =split[1];

        //2: Continue splitting the first part of the text data with the '-' delimiter to get the array
        String[] userArray = split[0].split("-");

        //3: Sort the array
        Arrays.sort(userArray);

        //4: double the elements in the array to get K2
        /* A-E-C -----> A C E A C E A C E */
        for (int i = 0; i <userArray.length -1 ; i++) {
            for (int j = i+1; j  < userArray.length ; j++) {
                //5: write K2 and V2 into the context
                context.write(new Text(userArray[i] +"-"+userArray[j]), newText(friendStr)); }}}}Copy the code

The Reducer class 】

public class Step2Reducer extends Reducer<Text.Text.Text.Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        //1:原来的K2就是K3
        //2: traverses the set, splices the elements in the set, and obtains V3
        StringBuffer buffer = new StringBuffer();
        for (Text value : values) {
            buffer.append(value.toString()).append("-");
            
        }
        //3: write K3 and V3 to the context
        context.write(key, newText(buffer.toString())); }}Copy the code

【 JobMain 】

public class JobMain extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        //1: Obtains the Job object
        Job job = Job.getInstance(super.getConf(), "common_friends_step2_job");

        //2: Sets the job task
            // Step 1: Set the input class and input path
            job.setInputFormatClass(TextInputFormat.class);
            TextInputFormat.addInputPath(job, new Path("file:///D:\\out\\common_friends_step1_out"));

            // Step 2: Set the Mapper class and data type
            job.setMapperClass(Step2Mapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            // Three, four, five, six

            // Step 7: Set the Reducer class and data type
            job.setReducerClass(Step2Reducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            // Step 8: Set the output class and output path
            job.setOutputFormatClass(TextOutputFormat.class);
            TextOutputFormat.setOutputPath(job, new Path("file:///D:\\out\\common_friends_step2_out"));

        //3: Waits for the job task to end
        boolean bl = job.waitForCompletion(true);
        return bl ? 0: 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        // Start the job task
        int run = ToolRunner.run(configuration, newJobMain(), args); System.exit(run); }}Copy the code

10. Establishment of inverted index

10.1 Requirement Analysis

Requirements: There is a large amount of text (documents, web pages) that needs to be indexed

Analysis of ideas:

The preferred form of data is to read the entire contents of the document, add the name of the document as the key, and the value of the document as 1

Map end data output:

hello-a.txt  1

hello-a.txt 1

hello-a.txt 1
Copy the code

Reduce end data output:

hello-a.txt 3

10.2 Code Implementation

public class IndexCreate extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(),new IndexCreate(),args);
    }
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(super.getConf(), IndexCreate.class.getSimpleName());
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("File:///D:\\ \\ \ index \\input"));
        job.setMapperClass(IndexCreateMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setReducerClass(IndexCreateReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("File:///D:\\ inverted index \\outindex"));
        boolean bool = job.waitForCompletion(true);
        return bool?0:1;
    }
    public static class IndexCreateMapper extends Mapper<LongWritable.Text.Text.IntWritable>{
        Text text = new Text();
        IntWritable v = new IntWritable(1);
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // Get file slices
            FileSplit fileSplit  = (FileSplit) context.getInputSplit();
            // Get the file name from the file slice
            String name = fileSplit.getPath().getName();
            String line = value.toString();
            String[] split = line.split("");
            // Output word -- filename as key value is 1
            for (String word : split) {
               text.set(word+"--"+name); context.write(text,v); }}}public static class IndexCreateReducer extends Reducer<Text.IntWritable.Text.IntWritable>{
        IntWritable value = new IntWritable();
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int count = 0;
            for(IntWritable value : values) { count += value.get(); } value.set(count); context.write(key,value); }}}Copy the code