thinking

If you think about global sorting, the first thing you think about is, collect data from the Map side, shuffle to reduce, set a Reduce, and sort the data in reduce. Obviously, this is not that different from a single machine, because the MapReduce framework sorts the data by key by default. Of course, we can also put the value above the key to achieve the value sorting, and finally when reduce back, in addition to the sorting is for the same partition, that is, a reduce to sort, so it can not fully use the cluster parallelization, so how to achieve global sorting more elegant?

Abstract

Sorting in Hadoop is divided into partial sorting, global sorting, auxiliary sorting, second sorting and so on. This article mainly introduces how to achieve key global sorting, there are three ways to achieve:

  1. Setting a Reduce
  2. Custom partitions are used to distribute data to multiple partitions in batches
  3. Use the framework to self-implement the TotalOrderPartitioner divider

implementation

First prepare some input data: github.com/hulichao/bi…

/data/job/file.txt
2
32
654
32
15
756
65223
Copy the code

Global sorting is achieved by setting a Reduce

Use a reduce to achieve global sorting, can be said to do not need to do any special operations, mapper,reduce,driver implementation as follows:

package com.hoult.mr.job;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class JobMapper extends Mapper<LongWritable.Text.IntWritable.IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        IntWritable intWritable = newIntWritable(Integer.parseInt(value.toString())); context.write(intWritable, intWritable); }}Copy the code
package com.hoult.mr.job;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class JobReducer  extends
        Reducer<IntWritable.IntWritable.IntWritable.IntWritable> {

    private int index = 0;// Global sort counter
    @Override
    protected void reduce(IntWritable key, Iterable
       
         values, Context context)
        throws IOException, InterruptedException {
        for (IntWritable value : values)
            context.write(newIntWritable(++index), value); }}Copy the code
package com.hoult.mr.job; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class JobDriver extends Configured implements Tool { @Override public int run(String[] args) throws Exception { if (args.length ! = 2) { System.err.println("input-path output-path"); System.exit(1); } Job job = Job.getInstance(getConf()); job.setJarByClass(JobDriver.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(JobMapper.class); job.setReducerClass(JobReducer.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(NullWritable.class); // Use a reduce to sort job.setnumreducetAsks (1); job.setJobName("JobDriver"); return job.waitForCompletion(true) ? 0:1; } public static void main(String[] args)throws Exception{ // int exitCode = ToolRunner.run(new JobDriver(), args); int exitCode = ToolRunner.run(new JobDriver(), new String[] {"data/job/", "data/job/output"}); System.exit(exitCode); }}Copy the code
12 26 3 15 4 22 5 26 6 32 7 32 8 54 9 92 10 650 11 654 12 756 13 5956 14 65223 10 650 11 654 12 756 13 5956 14 65223Copy the code

PS; The above tasks are started using hadoop’s own ToolRunner tool, and the subsequent code involves repeated code that is not listed, only for differences.

Custom partitions are used to distribute data to multiple partitions in batches

How to ensure global order of data through custom partitioning? We know that key value partition will send keys of different ranges to different reduce through the default partition function HashPartition, so we can use this to realize the partition. For example, if data is distributed in 100-100 million, we can use reduce1 to run the 10-10 million data. 10 million + 10-20 million data to make Reduce2 run… Finally, the ten files can be combined in order to get the global ordering data of all data by partition. Due to the small amount of data, 11 partitions are divided into 1-1000 and 10001-2000, respectively. Instead of implementing it in the first way, there are two points,

// Partitionner implements package com.hoult. Mr. Job; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Partitioner; /** * @author hulichao * @date 20-9-20 **/ public class JobPartitioner extends Partitioner<IntWritable, IntWritable> { @Override public int getPartition(IntWritable key, IntWritable value, int numPartitions) { int keyValue = Integer.parseInt(key.toString()); for (int i = 0; i < 10; i++) { if (keyValue < 1000 * (i+1) && keyValue >= 1000 * (i-1)) { System.out.println("key:" + keyValue + ", part:" + i); return i; } } return 10; } // Set a custom partitionerclass job. SetPartitionerClass (jobPartitioner.class); Job. SetNumReduceTasks (10);Copy the code

Execute the program, the result will produce 10 files, files in order.

part-r-00000
part-r-00001
part-r-00002
part-r-00003
part-r-00004
part-r-00005
part-r-00006
part-r-00007
part-r-00008
part-r-00009
Copy the code

Note: Note that the partition containing data must be less than or equal to the reduce number, otherwise Illegal partiion error will be included. Another disadvantage is that partition implementations that know less about the data may cause data skew and OOM problems.

Self-implementation using the frameworkTotalOrderPartitionerPartition to implement

Since the thought of the second mode of the custom, can solve the problem of most tilt, but in fact, before the data distribution does not understand, to evaluate the distribution of data, can only go to try, to see what the result value, thus the custom partition, is this sampling, for the sample and then realize the partition implement this way, the hadoop has helped us to achieve good, The TotalOrderPartitioner class provides a data sampler to partially sample the key value and then find the best split point of the key value according to the sampling result, so that the key is evenly distributed among different partitions.

The TotalOrderPartitioner provides three samplers as follows:

  • SplitSampler a sampler that samples data from a data fragment. The sampler is not suitable for sorted data
  • RandomSampler a RandomSampler that samples a data set at a set sampling rate
  • An IntervalSampler, which samples data from fragments at fixed intervals, has a very good effect on the sorted data

K[] getSample(InputFormat<K,V> info, Job Job); According to the returned length of K[], the array length -1 partition is generated, and the corresponding data is sent to the corresponding partition according to the range of the split point.

Code implementation:

// The mapper and driver types are slightly different
package com.hoult.mr.job.totalsort;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/ * * *@author hulichao
 * @dateThe 20-9-20 * * /
public class TotalMapper extends Mapper<Text.Text.Text.IntWritable> {
    @Override
    protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
        System.out.println("key:" + key.toString() + ", value:" + value.toString());
        context.write(key, newIntWritable(Integer.parseInt(key.toString()))); }}Copy the code
package com.hoult.mr.job.totalsort;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/ * * *@author hulichao
 * @dateThe 20-9-20 * * /
public class TotalReducer extends Reducer<Text.IntWritable.IntWritable.NullWritable> {
    @Override
    protected void reduce(Text key, Iterable
       
         values, Context context)
        throws IOException, InterruptedException {
        for(IntWritable value : values) context.write(value, NullWritable.get()); }}Copy the code
/ / the comparator
package com.hoult.mr.job.totalsort;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/** * custom comparator to compare the order of keys *@author hulichao
 * @dateThe 20-9-20 * * /
public class KeyComparator extends WritableComparator {
    protected KeyComparator(a) {
        super(Text.class, true);
    }

    @Override
    public int compare(WritableComparable w1, WritableComparable w2) {
        int num1 = Integer.valueOf(w1.toString());
        int num2 = Integer.valueOf(w2.toString());
        returnnum1 - num2; }}Copy the code
package com.hoult.mr.job.totalsort;

/ / driver
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/ * * *@author hulichao
 * @dateThe 20-9-20 * * /
public class TotalDriver extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        // Set non-partitioned sort
        conf.set("mapreduce.totalorderpartitioner.naturalorder"."false");
        Job job = Job.getInstance(conf, "Total Driver");
        job.setJarByClass(TotalDriver.class);

        // Set the path for reading files from the HDFS. The read file path is passed from the script file
        FileInputFormat.addInputPath(job,new Path(args[0]));
        // Set the output path of the MapReduce program. The results of MapReduce are input to the file
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        // Set the comparator to compare the size of the data and then sort it in order. This example is mainly used to compare the size of two keys
        job.setSortComparatorClass(KeyComparator.class);
        job.setNumReduceTasks(10);// Set the number of reduce

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(NullWritable.class);

        // Set the path to save partitions files
        TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), new Path(args[2]));
        Select * from key where 0.01 is the sampling rate;
        InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<>(0.1.3.100);
        // Write the sample data to the partition file
        InputSampler.writePartitionFile(job, sampler);

        job.setMapperClass(TotalMapper.class);
        job.setReducerClass(TotalReducer.class);
        // Set the partition class.
        job.setPartitionerClass(TotalOrderPartitioner.class);
        return job.waitForCompletion(true)?0 : 1;
    }
    public static void main(String[] args)throws Exception{
// int exitCode = ToolRunner.run(new TotalDriver(), new String[] {"data/job/input", "data/job/output", "data/job/partition","data/job/partitio2"});
        int exitCode = ToolRunner.run(newTotalDriver(), args); System.exit(exitCode); }}Copy the code

The result is similar to the second implementation, but it is important to note that this is only valid for cluster tests; local tests may report errors

The 2020-09-20 16:36:10, WARN 664 [. Org. Apache hadoop. Util. NativeCodeLoader] - Unable to load native - hadoop library for platform... using builtin-java classes where applicable Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 0 at com.hoult.mr.job.totalsort.TotalDriver.run(TotalDriver.java:32) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:90) at com.hoult.mr.job.totalsort.TotalDriver.main(TotalDriver.java:60)Copy the code

Wu Xie is a rookie in the fields of backstage, big data and artificial intelligence. More on this