An overview,




Everyone must be very familiar with the join operation in RDBMS, and pay attention to details when writing SQL. If there is any mistake, it will take a long time and cause great performance bottleneck. In Hadoop, join operation using MapReduce framework is also time-consuming, but due to the particularity of Hadoop’s distributed design concept, Therefore, this kind of join operation also has certain particularity. This paper mainly analyzes in detail several implementation methods of join operation between tables in MapReduce framework, and further illustrates them according to practical examples encountered in the actual development process.

Two, the implementation principle




Connect at the Reudce end. Join on the Reudce end is the most common mode used by the MapReduce framework to perform join operations between tables. The implementation principle is as follows: The Map end labels keys and values from different tables (files) to distinguish records from different sources. The connection field is then used as the key, the rest and the newly added flag are used as values, and the output is finished. The main work of the Reduce side: the grouping of connection fields as keys has been completed on the Reduce side. We just need to separate the records from different files (marked in map stage) in each group and Descartes at last. The principle is very simple. Here is an example:

(1) Define a value return type



  • package com.mr.reduceSizeJoin;
  • import java.io.DataInput;
  • import java.io.DataOutput;
  • import java.io.IOException;
  • import org.apache.hadoop.io.Text;
  • import org.apache.hadoop.io.WritableComparable;
  • public class CombineValues implements WritableComparable<CombineValues>{
  • //private static final Logger logger = LoggerFactory.getLogger(CombineValues.class);
  • private Text joinKey; // Link keywords
  • private Text flag; // File source flag
  • private Text secondPart; // All other parts except the link key
  • public void setJoinKey(Text joinKey) {
  • this.joinKey = joinKey;
  • }
  • public void setFlag(Text flag) {
  • this.flag = flag;
  • }
  • public void setSecondPart(Text secondPart) {
  • this.secondPart = secondPart;
  • }
  • public Text getFlag() {
  • return flag;
  • }
  • public Text getSecondPart() {
  • return secondPart;
  • }
  • public Text getJoinKey() {
  • return joinKey;
  • }
  • public CombineValues() {
  • this.joinKey = new Text();
  • this.flag = new Text();
  • this.secondPart = new Text();
  • }
  • @Override
  • public void write(DataOutput out) throws IOException {
  • this.joinKey.write(out);
  • this.flag.write(out);
  • this.secondPart.write(out);
  • }
  • @Override
  • public void readFields(DataInput in) throws IOException {
  • this.joinKey.readFields(in);
  • this.flag.readFields(in);
  • this.secondPart.readFields(in);
  • }
  • @Override
  • public int compareTo(CombineValues o) {
  • return this.joinKey.compareTo(o.getJoinKey());
  • }
  • @Override
  • public String toString() {
  • // TODO Auto-generated method stub
  • return “[flag=”+this.flag.toString()+”,joinKey=”+this.joinKey.toString()+”,secondPart=”+this.secondPart.toString()+”]”;
  • }
  • (2) Main code of Map and Reduce:

  • package com.mr.reduceSizeJoin;
  • import java.io.IOException;
  • import java.util.ArrayList;
  • import org.apache.hadoop.conf.Configuration;
  • import org.apache.hadoop.conf.Configured;
  • import org.apache.hadoop.fs.Path;
  • import org.apache.hadoop.io.Text;
  • import org.apache.hadoop.mapreduce.Job;
  • import org.apache.hadoop.mapreduce.Mapper;
  • import org.apache.hadoop.mapreduce.Reducer;
  • import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  • import org.apache.hadoop.mapreduce.lib.input.FileSplit;
  • import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  • import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  • import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  • import org.apache.hadoop.util.Tool;
  • import org.apache.hadoop.util.ToolRunner;
  • import org.slf4j.Logger;
  • import org.slf4j.LoggerFactory;
  • / * *
  • * @author zengzhaozheng
  • * Usage:
  • * left outer JOIN in reudce side join
  • * left connection, two files on behalf of the two tables, respectively connected to the field id field of table1 and table2 cityID fields
  • * table1: tb_DIM_city (id int,name string, orderID int,city_code,is_show)
  • * tb_dim_city. Dat file content, separators for “|” :
  • * id name orderid city_code is_show
  • * 0 Other 9999 9999 0
  • * 1 Changchun 1 901 1
  • * 2 Jilin 2 902 1
  • * 3 siping 3 903 1
  • * 4 Matsuhara 4 904 1
  • * 5 Tonghua 5 905 1
  • * 6 Liaoyuan 6 906 1
  • * 7 White City 7 907 1
  • * 8 Baishan 8 908 1
  • * 9 Yanji 9 909 1
  • * — — — — — — — — — — — — — — — — — — — — — — — — — sultry line — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
  • * table2: tb_user_profiles(userID int,userName string,network String,double flow,cityID int)
  • * tb_user_profiles. Dat file content, separators for “|” :
  • * userID network flow cityID
  • * 1 2G 123 1
  • * 2 3G 333 2
  • * 3 3G 555 1
  • * 4 2G 777 3
  • * 5 3G 666 4
  • *
  • * — — — — — — — — — — — — — — — — — — — — — — — — — sultry line — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
  • * the results to:
  • * 1 changchun 1 901 1 1 2G 123
  • * 1 Changchun 1 901 1 3 3G 555
  • * 2 Jilin 2 902 1 2 3G 333
  • * 3 siping 3 903 1 4 2G 777
  • * 4 Matsuhara 4 904 1 5 3G 666
  • * /
  • public class ReduceSideJoin_LeftOuterJoin extends Configured implements Tool{
  • private static final Logger logger = LoggerFactory.getLogger(ReduceSideJoin_LeftOuterJoin.class);
  • public static class LeftOutJoinMapper extends Mapper<Object, Text, Text, CombineValues> {
  • private CombineValues combineValues = new CombineValues();
  • private Text flag = new Text();
  • private Text joinKey = new Text();
  • private Text secondPart = new Text();
  • @Override
  • protected void map(Object key, Text value, Context context)
  • throws IOException, InterruptedException {
  • // Get the file input path
  • String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
  • // The data is from tb_DIM_city. dat, marked as “0”.
  • if(pathName.endsWith(“tb_dim_city.dat”)){
  • String[] valueItems = value.toString().split(“\\|”);
  • // Filter records with incorrect format
  • if(valueItems.length ! {= 5)
  • return;
  • }
  • flag.set(“0”);
  • joinKey.set(valueItems[0]);
  • secondPart.set(valueItems[1]+”\t”+valueItems[2]+”\t”+valueItems[3]+”\t”+valueItems[4]);
  • combineValues.setFlag(flag);
  • combineValues.setJoinKey(joinKey);
  • combineValues.setSecondPart(secondPart);
  • context.write(combineValues.getJoinKey(), combineValues);
  • }// Data comes from tb_user_profiles. Dat, marked as “1”
  • else if(pathName.endsWith(“tb_user_profiles.dat”)){
  • String[] valueItems = value.toString().split(“\\|”);
  • // Filter records with incorrect format
  • if(valueItems.length ! = 4) {
  • return;
  • }
  • flag.set(“1”);
  • joinKey.set(valueItems[3]);
  • secondPart.set(valueItems[0]+”\t”+valueItems[1]+”\t”+valueItems[2]);
  • combineValues.setFlag(flag);
  • combineValues.setJoinKey(joinKey);
  • combineValues.setSecondPart(secondPart);
  • context.write(combineValues.getJoinKey(), combineValues);
  • }
  • }
  • }
  • public static class LeftOutJoinReducer extends Reducer<Text, CombineValues, Text, Text> {
  • // Store the left table information in a group
  • private ArrayList<Text> leftTable = new ArrayList<Text>();
  • // Store the right table information in a group
  • private ArrayList<Text> rightTable = new ArrayList<Text>();
  • private Text secondPar = null;
  • private Text output = new Text();
  • / * *
  • * Call the Reduce function once per group
  • * /
  • @Override
  • protected void reduce(Text key, Iterable<CombineValues> value, Context context)
  • throws IOException, InterruptedException {
  • leftTable.clear();
  • rightTable.clear();
  • / * *
  • * Store the elements in the group separately by file
  • * Some points to note about this method:
  • * If there are too many elements in a group, it may cause OOM to appear in reduce phase.
  • * It is a good idea to understand the distribution of data before dealing with distributed problems, and take the most based on the different distribution
  • * Proper processing methods, which can effectively prevent OOM and data skew problems.
  • * /
  • for(CombineValues cv : value){
  • secondPar = new Text(cv.getSecondPart().toString());
  • / / the left table tb_dim_city
  • if(“0”.equals(cv.getFlag().toString().trim())){
  • leftTable.add(secondPar);
  • }
  • / / tb_user_profiles right table
  • else if(“1”.equals(cv.getFlag().toString().trim())){
  • rightTable.add(secondPar);
  • }
  • }
  • logger.info(“tb_dim_city:”+leftTable.toString());
  • logger.info(“tb_user_profiles:”+rightTable.toString());
  • for(Text leftPart : leftTable){
  • for(Text rightPart : rightTable){
  • output.set(leftPart+ “\t” + rightPart);
  • context.write(key, output);
  • }
  • }
  • }
  • }
  • @Override
  • public int run(String[] args) throws Exception {
  • Configuration conf=getConf(); // Get the configuration file object
  • Job job=new Job(conf,”LeftOutJoinMR”);
  • job.setJarByClass(ReduceSideJoin_LeftOuterJoin.class);
  • FileInputFormat.addInputPath(job, new Path(args[0])); // Set the map input file path
  • FileOutputFormat.setOutputPath(job, new Path(args[1])); // Set the reduce output file path
  • job.setMapperClass(LeftOutJoinMapper.class);
  • job.setReducerClass(LeftOutJoinReducer.class);
  • job.setInputFormatClass(TextInputFormat.class); // Set the file input format
  • job.setOutputFormatClass(TextOutputFormat.class); // Use the default output grid format
  • // Set the map output key and value types
  • job.setMapOutputKeyClass(Text.class);
  • job.setMapOutputValueClass(CombineValues.class);
  • // Set the output key and value types of reduce
  • job.setOutputKeyClass(Text.class);
  • job.setOutputValueClass(Text.class);
  • job.waitForCompletion(true);
  • return job.isSuccessful()? 1-0.
  • }
  • public static void main(String[] args) throws IOException,
  • ClassNotFoundException, InterruptedException {
  • try {
  • int returnCode = ToolRunner.run(new ReduceSideJoin_LeftOuterJoin(),args);
  • System.exit(returnCode);
  • } catch (Exception e) {
  • // TODO Auto-generated catch block
  • logger.error(e.getMessage());
  • }
  • }
  • Copy the code

    The specific analysis and data output and input are clearly written in the comments in the code. The deficiencies of Reduce Join are mainly analyzed here. The reason for the existence of reduce Join can be clearly seen: Because the overall data is divided, each Map task only processes part of the data and cannot obtain all the required Join fields. Therefore, we need to focus on join keys as reduce end grouping to collect all records with the same Join keys for processing. That’s where reduce Join comes in. The disadvantage of this method is that a large number of data transfers occur on the Map and Reduce terminals (namely, the Shuffle phase), resulting in low efficiency. Related reading: Hadoop Pseudo-distributed Setup Procedure Guide; HADOOP NATIVE LIBRARIES introduction; Hadoop-based Big Data Analysis Application Scenario and Project Actual Combat Exercise