Shuffle overview

Shuffle is a time-consuming process in a MapReduce job and is often asked in interviews. Shuffle refers to all operations performed after Map and before Reduce, including data partitioning and sorting, disk overwrite, and merge operations performed by the Map task, and data pulling from the network and sorting and merging operations performed by the Reduce task.

map task

The amount of map task data in a MapReduce task is determined by the number of split tasks. If small file merging is not enabled, at least one Map task will be started for each file. The figure shows four Map tasks.

split

  1. In fact, the split size I marked 128M is not accurate. First, the split size is calculated by minSplitSize, maxSplitSize and blockSize:

    / / code for org.apache.hadoop.mapreduce.lib.input.FileInputFormat.com puteSplitSize ()
      protected long computeSplitSize(long blockSize, long minSize,
                                      long maxSize) {
        return Math.max(minSize, Math.min(maxSize, blockSize));
      }
    Copy the code
  2. In the default configuration minSplitSize is 1, maxSplitSize is long. MAX_VALUE, so by default split size is blockSize, blocSize is 64MB in hadoop1.x and before. The default value of hadoop2.x is 128MB, and the default value of hadoop3.x is 256M.

  3. However, it should also be noted that this does not mean that the size of each split is the corresponding blockSize. In addition to the case that the data volume of the last split of the file is less than one blockSize, there will also be the case that the data volume of the last split is greater than the blockSize. The reason is that when splitting split, SPLIT_SLOP=1.1, SPLIT_SLOP=1.1, SPLIT_SLOP=1.1, SPLIT_SLOP=1.1

    / / code for org. Apache. Hadoop. Graphs. Lib. Input. FileInputFormat. GetSplits (in) part of the code
    
     private static final double SPLIT_SLOP = 1.1;   // 10% slop
    
      long bytesRemaining = length;
      while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
        int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
        splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                    blkLocations[blkIndex].getHosts(),
                    blkLocations[blkIndex].getCachedHosts()));
        bytesRemaining -= splitSize;
      }
    
      if(bytesRemaining ! =0) {
        int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
        splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                   blkLocations[blkIndex].getHosts(),
                   blkLocations[blkIndex].getCachedHosts()));
      }
    Copy the code
  4. In addition, split data is iterated into the map method one by one, that is, data is entered into memory one by one;

  5. Another detail is that all split map tasks except the first split discard the first data because the next method in the previous split map task always reads one more line to solve the problem of the first incomplete record:

    / / code for org. Apache. Hadoop. Mapred. LineRecordReader. Next ()
    
    
    // If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
    
    
    /** Read a line. */
      public synchronized boolean next(LongWritable key, Text value)
        throws IOException {
    
        // We always read one extra line, which lies outside the upper
        // split limit i.e. (end - 1)
        while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
          key.set(pos);
    
          int newSize = 0;
          if (pos == 0) {
            // discard the first line with no assignment to pos
            newSize = skipUtfByteOrderMark(value);
          } else {
            newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
            pos += newSize;
          }
    
          if (newSize == 0) {
            return false;
          }
          if (newSize < maxLineLength) {
            return true;
          }
    Copy the code

buffer

  1. As shown in the figure, the results of the map calculation are first stored in a fan-shaped cache in memoryMapOutputBuffer, the default size is 100M, and asynchrony starts when the cache is greater than 80% (SpillThread) overwrites. Details can be viewed.org.apache.hadoop.mapred.MapTask#MapOutputBuffer.init()The source;
  2. Data should be partitioned and sorted during overwrite. By default, partition hashing mode is adopted, and sorting mode is adopted to ensure that the partition is in order by defaultmap.sort.classSet to another sort class,
  3. Overflow small files are merged into large files and sent to reduce Task for processing.

reduce task

  1. A clear concept is that Reduce tasks and Map tasks are linear. Reduce tasks can be started only after all Map tasks are executed.
  2. In MapReduce, the reduce task is set to 1 by defaultmapred.reduce.tasksSpecify that there are three Reduce tasks in the figure above;
  3. By default, hive calculates the amount of data processed by each Reduce task, maximum number of Reduce tasks for each reduce task, and total amount of data. You can also set parameters.

fetch

  1. Data on the Map end is not sent to the Reduce end, but the Reduce end actively fetches data on the map end.
  2. Data pulled from different map ends must be merged to ensure that data is sorted in partitions and groups.

reduce

  1. The number of map operations in a MapReduce is determined by the number of data records. That is, the granularity of the Map method is one record, while the granularity of the Reduce method is a group of records.
  2. Like Map, Redcue also reads data in the way of iterators, so the data is also entered into memory one by one. However, reduce uses two iterators to skillfully identify whether the data is the same group of data. This detail will be explained later through source code parsing.

conclusion

Shuffle is a time-consuming process because a lot of sorting, data overwrite, data merge, and network transmission are performed in the split steps.