1. MapReduce data flow

In our previous code, the input path was read by FileInputFormat, and the flow was as follows:

MapReduce process:

Input data to output data is shown in the figure above. The process can be divided into three phases: Map, Shuffle, and Reduce.

From a code perspective:

  • The MapTask phase is 67% Map and 33% sort.
  • The Reduce stages are copy, sort, and Reduce.

Therefore, the overall process is map->(sort->copy->sort)->reduce process. The intermediate stage is Shuffle.

Second, the InPutFormat

This is an abstract class, and its subclasses have the following. Use CTRL + H to view the subclasses of this class

This superclass has two important abstract methods:

@Public
@Stable
public abstract class InputFormat<K, V> {
    public InputFormat() {
    }

    public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException;

    public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
}
Copy the code

1. GetSplits () : Methods to generate splits.

2. CreateRecordReader: createRecordReader objects that are really responsible for reading data

2.1 FileInputFormat

The getSplits() method was realized.

No changes were made to RecordReader

IsSplitable (): indicates whether the currently entered data set can be shred.

Need to master the specific implementation of subclasses TextInputFormat, CombineTextInputFormat.

Section 2.2

Data Block: A Block is a unit of the HDFS that physically divides data into blocks

Data slicing: Data slicing only logically slashes input and does not slice it for storage on disk. Data to be calculated in MR is logically divided into N slices, which is the unit of MapReduce calculation data.

1. By default, the slice size is the block size, which has the advantage of not having to read across machines.

2. When slicing, the whole data set is not considered, and each file is broken separately.

3. The importance of slicing determines the concurrency of tasks in Map stage. Multiple MpaTask tasks can improve the concurrent processing capacity of the cluster, but more parallel tasks are not the better. Therefore, we need to take a look at the sharding mechanism of MapReduce.

The splits to getSplits

long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); / / 1 related configuration items: "graphs. Input. Fileinputformat. Split. Minsize" = "0" long maxSize = getMaxSplitSize (job); // if not configured, it is long. MAX_VALUE "Graphs. Input. Fileinputformat. Split. Maxsize" default configuration this a long blockSize = file. GetBlockSize (); // Obtain the block size of the file in the cluster environment. If the cluster environment is local, obtain the block size of the file in the cluster environment. Default local blockSize 32M (33554432) 32M 1 long. MAX_VALUE Long splitSize = computeSplitSize(blockSize, minSize, maxSize); ==>return Math.max(minSize, Math.min(maxSize, blockSize)); While (((double) byteRemaining)/splitSize > SPLIT_SLOP) // SPLIT_SLOP = 1.1 If the remaining sections do not exceed 0.1, allocate them to the previous sections.Copy the code

Conclusion:

1. Each slice needs to be processed by a MapTask, which means that in a MapReduce, there will be as many MapRedcuce slices as there are slices.

2. The slice size is equal to the block size by default

3. When slicing, each file will be sliced separately rather than as a whole

4. The number of slices is not the more the better, nor the less the better. According to the actual situation, the appropriate number of slices should be processed.

2.3 TextInputFormat

TextInputFormat is the default FileInputFormat implementation class. Read each record in a row. The key is the starting byte offset, of type LongWritable, that stores the line throughout the file. The value is the content of the line and does not include any terminators (newline or carriage return).

Slicing rules: Slicing rules for the parent class

Read data: LineRecordReader (read data row by row)

2.4 CombineTextInputFormat Slicing mechanism

According to the default slicing mechanism, no matter how small the file is, it will be a single slice and will be handed to MapTask. Therefore, if there are a large number of small files, a large number of MapTasks will be generated, which is extremely inefficient. Therefore, to handle scenarios with too many small files, combineTextInputFormat comes into being.

1. Usage scenario: It is used for scenarios with too many small files. It can logically plan multiple small files into a slice, so that multiple small files can be handed to a MapTask for processing.

2. Set the maximum number of virtual storage slices

# virtual storage section of the maximum set according to the actual small file size best to set the value of specific CombineTextInputFormat setMaxInputSplitSize / / 4 m (job, 4194304)Copy the code

3. Slicing mechanism

The survival slicing process includes virtual stored procedure and slicing procedure.

Examples of sharding are as follows:

If you want to set the virtual storage slice, set it in drive:

/ / if not set InputFormat, its default using TextInputFormat. Class job. SetInputFormatClass (CombineTextInputFormat. Class); / / virtual storage section of the maximum set 4 m CombineTextInputFormat setMaxInputSplitSize (job, 4194304);Copy the code