Two files are provided: information.txt:

// Get a professional idBig Data for games1
Null	Java	3Learning to Null4Shopping the whole stack2
Copy the code

student.txt:

//id Name gender
1Zhang SAN female2Li si men3Fifty women4Zhao six menCopy the code

Topic request

TXT File to Hdfs

Student.txt is stored locally

Distribute Read data on the HDFS.

Encapsulate HDFS and local data into a JavaBean object

Objects must be encapsulated on the Map side, and the number of Null attributes in the reduce side is calculated as the number of values, and the output key is the ToString of the Bean object

The partition is set to two, based on gender

Result before partition (result is local) :

Results after partition (partition to run on cluster)

code

Pom.xml == If you need maven libraries, please send me a private message

<? The XML version = "1.0" encoding = "utf-8"? > < project XMLNS = "http://maven.apache.org/POM/4.0.0" XMLNS: xsi = "http://www.w3.org/2001/XMLSchema-instance" Xsi: schemaLocation = "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > The < modelVersion > 4.0.0 < / modelVersion > < groupId > cn. Itcast < / groupId > < artifactId > graphs < / artifactId > < version > 1.0 - the SNAPSHOT < / version > < repositories > < repository > < id > cloudera < id > <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.Hadoop</groupId> <artifactId>Hadoop-client</artifactId> < version > server - mr1 - cdh5.14.0 < / version > < / dependency > < the dependency > < groupId > org.. Apache Hadoop < / groupId > < artifactId > Hadoop - common < / artifactId > < version > server - cdh5.14.0 < / version > < / dependency > < the dependency > < groupId > org, apache Hadoop < / groupId > < artifactId > Hadoop - HDFS < / artifactId > < version > server - cdh5.14.0 < / version > </dependency> <dependency> <groupId>org.apache.Hadoop</groupId> <artifactId>Hadoop-mapreduce-client-core</artifactId> <version>2.6.0- cDH5.14.0 </version> </dependency> <dependency> <groupId>junit</groupId> <artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId> testng</groupId> <artifactId>testng</artifactId> <version>RELEASE</version> </dependency> </dependencies> <build> <plugins> <plugin> . < groupId > org, apache maven plugins < / groupId > < artifactId > maven - compiler - plugin < / artifactId > < version > 3.0 < / version > <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> . < groupId > org, apache maven plugins < / groupId > < artifactId > maven - shade - plugin < / artifactId > < version > 2.4.3 < / version > <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <minimizeJar>true</minimizeJar> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>Copy the code

Not before the partition

Step 1: Upload the information. TXT file to the cluster

package com.czxy.ittianmao.demo06;

import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;

public class FileJoinMap extends Mapper<LongWritable.Text.Text.FileJoinBean> {
    String line=null;
    HashMap<String,String> map = new HashMap<>();
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        URI[] cacheFiles = DistributedCache.getCacheFiles(context.getConfiguration());

        FileSystem fs = FileSystem.get(cacheFiles[0], context.getConfiguration());

        FSDataInputStream open = fs.open(new Path(cacheFiles[0]));

        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(open));
            while((line=bufferedReader.readLine())! =null){
                map.put(line.toString().split("\t") [2],line); }}@Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        FileJoinBean bean = new FileJoinBean();
        String[] student = value.toString().split("\t");

        String ThisValue = map.get(student[0]);

        String[] orders = ThisValue.split("\t");
        bean.setId(student[0]);
        bean.setName(student[1]);
        bean.setSex(student[2]);
        bean.setAihao(orders[0]);
        bean.setJob(orders[1]);

           context.write(new Text(student[0]),bean); }}Copy the code

FileJoinReduce

package com.czxy.ittianmao.demo06;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FileJoinReduce extends Reducer<Text.FileJoinBean.FileJoinBean.LongWritable> {

    @Override
    protected void reduce(Text key, Iterable<FileJoinBean> values, Context context) throws IOException, InterruptedException {

        int count=0;
        for (FileJoinBean value : values) {
            if (value.getName()==null||value.getName().equals("Null")){
                count++;
            }
            if (value.getJob()==null||value.getJob().equals("Null")){
                count++;
            }
            if (value.getAihao()==null||value.getAihao().equals("Null")){
                count++;
            }
            if (value.getSex()==null||value.getSex().equals("Null")){
                count++;
            }
            context.write(value,newLongWritable(count)); }}}Copy the code

FileJoinBean

package com.czxy.ittianmao.demo06;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FileJoinBean implements Writable {
    private String id;
    private String name;
    private String sex;
    private String aihao;
    private String job;

    public FileJoinBean(a) {}public String getId(a) {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName(a) {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getSex(a) {
        return sex;
    }

    public void setSex(String sex) {
        this.sex = sex;
    }

    public String getAihao(a) {
        return aihao;
    }

    public void setAihao(String aihao) {
        this.aihao = aihao;
    }

    public String getJob(a) {
        return job;
    }

    public void setJob(String job) {
        this.job = job;
    }

    @Override
    public String toString(a) {
        return "FileJoinBean{" +
                "id='" + id + '\' ' +
                ", name='" + name + '\' ' +
                ", sex='" + sex + '\' ' +
                ", aihao='" + aihao + '\' ' +
                ", job='" + job + '\' ' +
                '} ';
    }

    @Override
    public void write(DataOutput out) throws IOException {
     out.writeUTF(id+"");
        out.writeUTF(name+"");
        out.writeUTF(sex+"");
        out.writeUTF(aihao+"");
        out.writeUTF(job+"");
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.id=in.readUTF();
        this.name=in.readUTF();
        this.sex=in.readUTF();
        this.aihao=in.readUTF();
        this.job=in.readUTF(); }}Copy the code

FileJoinDriver

package com.czxy.ittianmao.demo06;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.net.URI;

public class FileJoinDriver extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        Configuration configuration = new Configuration();

        DistributedCache.addCacheFile(new URI("HDFS: / / 192.168.100.100:8020 / graphs/information. TXT"),configuration);

        Job job = Job.getInstance(configuration);

        job.setJarByClass(FileJoinDriver.class);

        job.setInputFormatClass(TextInputFormat.class);

        TextInputFormat.addInputPath(job,new Path("E: Big Data course of Chuanzi College \ First Semester \ Homework \MapReduce Stage homework \MapReduce Comprehensive questions \student.txt"));

        job.setMapperClass(FileJoinMap.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FileJoinBean.class);

        job.setReducerClass(FileJoinReduce.class);
        job.setOutputKeyClass(FileJoinBean.class);
        job.setOutputValueClass(LongWritable.class);

        job.setOutputFormatClass(TextOutputFormat.class);

        TextOutputFormat.setOutputPath(job,new Path("E: Big Data course of Chuanzhi College \ first Semester \ Homework \MapReduce Stage homework \MapReduce Synthesis problem \\ Join"));
        return job.waitForCompletion(true)?0:1;
    }

    public static void main(String[] args) throws Exception{
        ToolRunner.run(newFileJoinDriver(),args); }}Copy the code

Partitioned code

Step 1: Upload student. TXT to HDFS as well

package com.czxy.ittianmao.demo06;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class FilePartition extends Partitioner<Text.FileJoinBean> {


    @Override
    public int getPartition(Text text, FileJoinBean fileJoinBean, int i) {
        if (fileJoinBean.getSex().equals("Male")) {return 0;
        }else{
            return 1; }}}Copy the code

FileJoinDriver

package com.czxy.ittianmao.demo06;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.net.URI;

public class FileJoinDriver extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        Configuration configuration = new Configuration();
		// Add cache
        DistributedCache.addCacheFile(new URI("HDFS: / / 192.168.100.100:8020 / graphs/information. TXT"),configuration);

        Job job = Job.getInstance(configuration);
		// If you want to run on a cluster, you must add this sentence
        job.setJarByClass(FileJoinDriver.class);

        job.setInputFormatClass(TextInputFormat.class);
        
        TextInputFormat.addInputPath(job,new Path("HDFS: / / 192.168.100.100:8020 / graphs/student. TXT"));

        job.setMapperClass(FileJoinMap.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FileJoinBean.class);

        job.setReducerClass(FileJoinReduce.class);
        job.setOutputKeyClass(FileJoinBean.class);
        job.setOutputValueClass(LongWritable.class);

        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("/mapreduce/MapJoin_OUT"));
		// The partition is added after partition
		// This is to set up your own custom partition
        job.setPartitionerClass(FilePartition.class);
        // This is the number of reduce calculations
        job.setNumReduceTasks(2);
        return job.waitForCompletion(true)?0:1;
    }

    public static void main(String[] args) throws Exception{
        ToolRunner.run(newFileJoinDriver(),args); }}Copy the code