MapTask (input-map-output) MapTask (input-map-output)

MapTask input process

Source code analysis

First look at the entry to input, which is MapTask’s run method

  public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
    throws IOException, ClassNotFoundException, InterruptedException {
    this.umbilical = umbilical;

    if (isMapTask()) {
      // If there are no reducers then there won't be any sort. Hence the map 
      // phase will govern the entire attempt's progress.
      // If no reducer is available, all resources go to the map method
      if (conf.getNumReduceTasks() == 0) { // No sorting is required
        mapPhase = getProgress().addPhase("map".1.0 f);
      } else {
        // If there are reducers then the entire attempt's progress will be 
        // split between the map phase (67%) and the sort phase (33%).
        // If there is a reducer, allocate 66.7% of the resources to map and 33.3% to order
        mapPhase = getProgress().addPhase("map".0.667 f);
        sortPhase  = getProgress().addPhase("sort".0.333 f);
      }
    }
    TaskReporter reporter = startReporter(umbilical);
 
    boolean useNewApi = job.getUseNewMapper();
    // Perform context initialization and determine the outputFormat format class for the map output.
    initialize(job, getJobID(), reporter, useNewApi);

    // check if it is a cleanupJobTask
    if (jobCleanup) {
      runJobCleanupTask(umbilical, reporter);
      return;
    }
    if (jobSetup) {
      runJobSetupTask(umbilical, reporter);
      return;
    }
    if (taskCleanup) {
      runTaskCleanupTask(umbilical, reporter);
      return;
    }

    if (useNewApi) {
      // Hadoop2. x starts Mapper using the new API by default
      runNewMapper(job, splitMetaInfo, umbilical, reporter);
    } else {
      runOldMapper(job, splitMetaInfo, umbilical, reporter);
    }
    done(umbilical, reporter);
  }
Copy the code

In the run method, the system determines whether reduce is required. If no, all resources are allocated to the Map process. If yes, 33.3% of resources are allocated to partition and key sorting. The Initialize method then performs context initialization and determines the OutputFormat class for the map output. Finally, the runNewMapper method is called.

  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runNewMapper(final JobConf job,
                    final TaskSplitIndex splitIndex,
                    final TaskUmbilicalProtocol umbilical,
                    TaskReporter reporter
                    ) throws IOException, ClassNotFoundException,
                             InterruptedException {
    // make a task context so we can get the classes
    Construct the object taskContext of TaskAttemptContext
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, 
                                                                  getTaskID(),
                                                                  reporter);
    // make a mapper
    // Through taskContext, the reflection mechanism gets the job and passes it to the Mapper target object.
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
      (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
    // make the input format
    // taskContext can be used to construct an object of the formatted class that enters the map passed in from the job.
    org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
      (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
    // rebuild the input split
    // Create the split object used by this MapTask.
    // This method will fetch the file from HDFS according to the file block address corresponding to the split passed by the mapTask.
    // Then call seek to jump to the offset coordinate of the split file block.
    org.apache.hadoop.mapreduce.InputSplit split = null;
    split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
        splitIndex.getStartOffset());
    LOG.info("Processing split: " + split);

    // Construct RecordReader, that is, read records from split slices, format them into RecordReader and pass them to map
    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
      new NewTrackingRecordReader<INKEY,INVALUE>
        (split, inputFormat, reporter, taskContext);
    
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
    org.apache.hadoop.mapreduce.RecordWriter output = null;
    
    // get an output object
    if (job.getNumReduceTasks() == 0) {
      output = 
        new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
    } else {
      // The NewOutputCollector object identifies the collator, divider
      output = new NewOutputCollector(taskContext, job, umbilical, reporter);
    }

    org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
    mapContext = 
      new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
          input, output, 
          committer, 
          reporter, split);

    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
        mapperContext = 
          new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
              mapContext);

    try { // Core methods are usually inside try
      input.initialize(split, mapperContext);
      Mapper is our own mapper class
      mapper.run(mapperContext);
      mapPhase.complete();
      setPhase(TaskStatus.Phase.SORT);
      statusUpdate(umbilical);
      input.close();
      input = null;
      output.close(mapperContext);
      output = null;
    } finally{ closeQuietly(input); closeQuietly(output, mapperContext); }}Copy the code

The method first constructs the taskContext object of TaskAttemptContext, and then the taskContext object passed in from the Job into the formatted class of the map. Then create a split object using the getSplitDetails method. This method will fetch the file from HDFS according to the block location of the split passed in by MapTask, and then use infile.seek (offset). Jump to the offset coordinate of the block corresponding to this split.

Then the RecordReader will be constructed, that is to read a record from a split slice, format it into RecordReader and pass it to map for processing. Can see NewTrackingRecordReader constructor, enclosing real = inputFormat. CreateRecordReader (split, taskContext); RecordReader real is the object used by the map task to read split and write the map processing results.

Because the inputFormat object defaults to the TextInputFormat class, the LineRecordReader object is returned. Go to the initialize method of this object and have a look:

  public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
    FileSplit split = (FileSplit) genericSplit;
    Configuration job = context.getConfiguration();
    this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
    start = split.getStart();
    end = start + split.getLength();
    final Path file = split.getPath();

    // open the file and seek to the start of the split
    final FutureDataInputStreamBuilder builder =
        file.getFileSystem(job).openFile(file);
    FutureIOSupport.propagateOptions(builder, job,
        MRJobConfig.INPUT_FILE_OPTION_PREFIX,
        MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
    fileIn = FutureIOSupport.awaitFuture(builder.build()); // File-oriented input stream
    
    CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
    if (null! =codec) { isCompressedInput =true;
      decompressor = CodecPool.getDecompressor(codec);
      if (codec instanceof SplittableCompressionCodec) {
        final SplitCompressionInputStream cIn =
          ((SplittableCompressionCodec)codec).createInputStream(
            fileIn, decompressor, start, end,
            SplittableCompressionCodec.READ_MODE.BYBLOCK);
        in = new CompressedSplitLineReader(cIn, job,
            this.recordDelimiterBytes);
        start = cIn.getAdjustedStart();
        end = cIn.getAdjustedEnd();
        filePosition = cIn;
      } else {
        if(start ! =0) {
          // So we have a split that is only part of a file stored using
          // a Compression codec that cannot be split.
          throw new IOException("Cannot seek in " +
              codec.getClass().getSimpleName() + " compressed stream");
        }

        in = new SplitLineReader(codec.createInputStream(fileIn,
            decompressor), job, this.recordDelimiterBytes); filePosition = fileIn; }}else {
      fileIn.seek(start); // Each map cannot read the full text, it needs to read from a specific offset
      in = new UncompressedSplitLineReader(
          fileIn, job, this.recordDelimiterBytes, split.getLength());
      filePosition = fileIn;
    }
    // If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // verify the position where split starts reading
    if(start ! =0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
    this.pos = start;
  }
Copy the code

Start is assigned the offset of the block corresponding to the first row of the first split, and fileIn is read from start as a file-oriented input stream through seek. Then came the most interesting part of the method:

if(start ! =0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
Copy the code

Because blocks used by HDFS to divide files are divided by bytes, a row of data may be divided into two blocks, and map calculation must read the entire row. So we’re going to try to see if start is not zero, so we’re going to try to see if this split is the first split. If not, the first line is discarded and the initial offset (start) is updated to the second line of the split. In simple terms, all but the first split will discard the first row, and all but the last split will read an extra row. In this way, HDFS blocks can make up for the problem of cutting data.

When LineRecordReader is initialized, we execute mapper.run(mapperContext), which is our own mapper method.

  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      // Call LineRecordReader's nextKeyValue method
      // 1 reads a record in the data and assigns key, value. 2 returns a Boolean value to the caller declaring whether there is more data.

      while(context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); }}finally{ cleanup(context); }}Copy the code

The context is MapContextImpl, and the context.nextKeyValue() method is the equivalent of LineRecordReader’s nextKeyValue method. This method reads a record in the data, assigns values to key and value, and returns whether there is more data. Then getCurrentKey and getCurrentValue get the key and value, respectively.

MapTask output process

Source code analysis

Continue with the Run method in MapTask.

    if (job.getNumReduceTasks() == 0) {
      output = 
        new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
    } else {
      // The NewOutputCollector object identifies the collator, divider
      output = new NewOutputCollector(taskContext, job, umbilical, reporter);
    }
Copy the code

To see if partition sort is needed, go to the NewOutputCollector constructor:

    NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
                       JobConf job,
                       TaskUmbilicalProtocol umbilical,
                       TaskReporter reporter
                       ) throws IOException, ClassNotFoundException {
      // buffer buffer related logic
      collector = createSortingCollector(job, reporter);
      partitions = jobContext.getNumReduceTasks(); // As many partitions as there are reduce jobs
      // If reduceTask is greater than 1, get the partition through reflection mechanism, you can customize the partition!
      // If there is only one reduceTask, then return 0 because it will only be pulled by that one reduceTask.
      if (partitions > 1) {
        partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
          ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
      } else { // partitions == 1
        partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
          @Override
          public int getPartition(K key, V value, int numPartitions) {
            return partitions - 1; / / returns 0}}; }}Copy the code

The NewOutputCollector object identifies the collator collector and partitioner. Start by looking at createSortingCollector, the method that creates the collator, which by default gives MapOutputBuffer as the collator. We look at the key method, initialize collector.init(context)

    public void init(MapOutputCollector.Context context ) throws IOException, ClassNotFoundException {
      job = context.getJobConf();
      reporter = context.getReporter();
      mapTask = context.getMapTask();
      mapOutputFile = mapTask.getMapOutputFile();
      sortPhase = mapTask.getSortPhase();
      spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);
      partitions = job.getNumReduceTasks();
      rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();

      //sanity checks
      // When the sort buffer space is proportionally occupied by map output records, the sort occurs and overwrites to disk
      final float spillper =
        job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8); // The default value for overwrite is 80% instead of 100% to block map writing to buffer
      // The current buffer size
      final int sortmb = job.getInt(MRJobConfig.IO_SORT_MB,
          MRJobConfig.DEFAULT_IO_SORT_MB);
      indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
                                         INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
      if (spillper > (float)1.0 || spillper <= (float)0.0) {
        throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
            "\"," + spillper);
      }
      if ((sortmb & 0x7FF) != sortmb) {
        throw new IOException(
            "Invalid \"" + JobContext.IO_SORT_MB + "\"," + sortmb);
      }
      // Sort by default
      sorter = ReflectionUtils.newInstance(job.getClass(
                   MRJobConfig.MAP_SORT_CLASS, QuickSort.class,
                   IndexedSorter.class), job);
      // buffers and accounting
      int maxMemUsage = sortmb << 20;
      maxMemUsage -= maxMemUsage % METASIZE;
      kvbuffer = new byte[maxMemUsage];
      bufvoid = kvbuffer.length;
      kvmeta = ByteBuffer.wrap(kvbuffer)
         .order(ByteOrder.nativeOrder())
         .asIntBuffer();
      setEquator(0);
      bufstart = bufend = bufindex = equator;
      kvstart = kvend = kvindex;

      maxRec = kvmeta.capacity() / NMETA;
      softLimit = (int)(kvbuffer.length * spillper);
      bufferRemaining = softLimit;
      LOG.info(JobContext.IO_SORT_MB + ":" + sortmb);
      LOG.info("soft limit at " + softLimit);
      LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
      LOG.info("kvstart = " + kvstart + "; length = " + maxRec);

      // k/v serialization
      / / the comparator
      comparator = job.getOutputKeyComparator();
      keyClass = (Class<K>)job.getMapOutputKeyClass();
      valClass = (Class<V>)job.getMapOutputValueClass();
      serializationFactory = new SerializationFactory(job);
      keySerializer = serializationFactory.getSerializer(keyClass);
      keySerializer.open(bb);
      valSerializer = serializationFactory.getSerializer(valClass);
      valSerializer.open(bb);

      // output counters
      mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
      mapOutputRecordCounter =
        reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
      fileOutputByteCounter = reporter
          .getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);

      // compression
      if (job.getCompressMapOutput()) {
        Class<? extends CompressionCodec> codecClass =
          job.getMapOutputCompressorClass(DefaultCodec.class);
        codec = ReflectionUtils.newInstance(codecClass, job);
      } else {
        codec = null;
      }

      // combiner
      // If the map phase repeats many keys, it is unnecessary to do a small reduce for a compression
      final Counters.Counter combineInputCounter =
        reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
      combinerRunner = CombinerRunner.create(job, getTaskID(), 
                                             combineInputCounter,
                                             reporter, null);
      if(combinerRunner ! =null) {
        final Counters.Counter combineOutputCounter =
          reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
        combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
      } else {
        combineCollector = null;
      }
      spillInProgress = false;
      minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
      spillThread.setDaemon(true);
      spillThread.setName("SpillThread");
      spillLock.lock();
      try {
        spillThread.start();
        while (!spillThreadRunning) {
          spillDone.await();
        }
      } catch (InterruptedException e) {
        throw new IOException("Spill thread failed to initialize", e);
      } finally {
        spillLock.unlock();
      }
      if(sortSpillException ! =null) {
        throw new IOException("Spill thread failed to initialize", sortSpillException); }}Copy the code

Looking through the method, there are a few key points:

  • spillper: When the buffer ismapPartition, sort, and overwrite to disk occurs when a certain percentage of the output of The default 80%;
  • sortmb: Buffer size;
  • sorter: Sorter used when sorting occurs, default fast sorting;
  • comparator: Comparator required by the collator. The user-defined sort comparator is preferred and used by defaultkeySelf comparator;
  • combinerRunner: The value is not merged by default. You need to change the value manually. Used formapStages on repetitionkeyforreduceOperation, the number of merge times is 3 by default.
  • SpillThread: in the threadsortAndSpillMethod implements sorting and overwrite.

So let’s go back to Mapper. The run method calls the map method.

  protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {
    context.write((KEYOUT) key, (VALUEOUT) value);
  }
Copy the code

If the context is MapContextImpl, go to the write method and find that MapContextImpl doesn’t have the method, look for its parent class. Was later found in the TaskInputOutputContextImpl the implementation:

  public void write(KEYOUT key, VALUEOUT value ) throws IOException, InterruptedException {
    output.write(key, value);
  }
Copy the code

The output is NewOutputCollector (NewOutputCollector) and its write method is as follows:

    public void write(K key, V value) throws IOException, InterruptedException {
      // Map enters buffer with parameters k, v, and p
      collector.collect(key, value,
                        partitioner.getPartition(key, value, partitions));
    }
Copy the code

Partitions are retrieved using getPartition:

  public int getPartition(K key, V value,
                          int numReduceTasks) {
    // key.hashcode () & integer.max_value returns a non-negative Integer
    // % numReduceTasks will enter the same partition
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
Copy the code

Key value partitions are eventually saved from the map into the buffer.

Diagram the MapOutputBuffer process

Buffer is a linear byte array in nature. After storing key values, corresponding indexes are required to facilitate query. About the index, it is fixed at 16 and contains the following contents:

  • P:partition
  • KS:keyinbufferStart position in
  • VS:valueinbufferThe starting position in, can also be calculatedkeyEnd position
  • VL:valueThe length of theta can also be calculatedvalueEnd position of

In the Hadoop 1.x version, the index is stored in a separate byte array. Imagine a situation where the key values are very small, but there are many of them, and they take up less space in the buffer. At the same time, the space for the index runs out, so you have to overwrite, wasting buffer space.

In Hadoop 2.x, the key value and index are stored in the same byte array, and the key value is stored from left to right in order, and the index is stored from right to left as required. This solves the problem of wasted buffer space.

Assuming that the default buffer is 80% occupied, the buffer locks the space occupied by the current key value and index, and then starts SpillThread to quicksort 80% of the data, while map writes data to the remaining space. At this time, the sort is a double sort, first through the index P, and then through the key in the same P, finally achieve the partition order and the key order within the partition.

Note that sorting involves the movement of memory data, which can be complicated due to different sizes of key values. So what we’re moving here is just the index, because the index size is fixed. Finally, data written to disk files is ordered as long as the sorted index is followed during overwrite.

Next we need to focus on how the map writes data to the buffer while overwriting.

The buffer can be treated as a circular buffer by performing a split in free memory, appending indexes to the left and key values to the right of the split line.

Combiner TODO to be perfected

Combiner performs reduce operations on data in advance. The operation occurs after buffer sorting and before buffer overwrite.

Map eventually merges some small files into a large file, preventing the fragmentation of small files from causing random reads and writes when Reduce pulls data.