Write in the front: This paper briefly introduces the running steps of ReduceTask, and focuses on how ReduceTask uses iterator mode to read data, avoiding the problem of OOM when processing big data with priority memory.

Source code analysis

The Reducer class we overwrote has these three steps:

  • Shuffle: the samekeyPull to a partition
  • Sort:MapTaskAlready will be differentkeyThe data is sorted and the data is herereduceYou don’t have to sort it any more. Here,SortIt’s actually a merge sort, it’s going to be the samekeyPut together
  • Reduce: that is,reduceTo calculate

First, from the run method of ReduceTask, the above three steps are reflected in this method.

  public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
    throws IOException, InterruptedException, ClassNotFoundException {
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());

    if (isMapOrReduce()) {
      // reduceTask is divided into three stages: copy, group sorting, and reduce execution
      copyPhase = getProgress().addPhase("copy");
      sortPhase  = getProgress().addPhase("sort");
      reducePhase = getProgress().addPhase("reduce");
    }
    // start thread that will handle communication with parent
    TaskReporter reporter = startReporter(umbilical);
    
    boolean useNewApi = job.getUseNewReducer();
    initialize(job, getJobID(), reporter, useNewApi);

    // check if it is a cleanupJobTask
    if (jobCleanup) {
      runJobCleanupTask(umbilical, reporter);
      return;
    }
    if (jobSetup) {
      runJobSetupTask(umbilical, reporter);
      return;
    }
    if (taskCleanup) {
      runTaskCleanupTask(umbilical, reporter);
      return;
    }
    
    // Initialize the codec
    codec = initCodec();
    RawKeyValueIterator rIter = null;
    ShuffleConsumerPlugin shuffleConsumerPlugin = null;
    
    Class combinerClass = conf.getCombinerClass();
    CombineOutputCollector combineCollector = 
      (null! = combinerClass) ?new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;

    Class<? extends ShuffleConsumerPlugin> clazz =
          job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
					
    shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
    LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);

    ShuffleConsumerPlugin.Context shuffleContext = 
      new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, 
                  super.lDirAlloc, reporter, codec, 
                  combinerClass, combineCollector, 
                  spilledRecordsCounter, reduceCombineInputCounter,
                  shuffledMapsCounter,
                  reduceShuffleBytes, failedShuffleCounter,
                  mergedMapOutputsCounter,
                  taskStatus, copyPhase, sortPhase, this,
                  mapOutputFile, localMapFiles);
    // Initialize shuffle
    shuffleConsumerPlugin.init(shuffleContext);
    // Perform a shuffle pull of the map record, which returns an iterator.
    rIter = shuffleConsumerPlugin.run();

    // free up the data structures
    mapOutputFilesOnDisk.clear();
    
    sortPhase.complete();                         // sort is complete
    setPhase(TaskStatus.Phase.REDUCE); 
    statusUpdate(umbilical);
    Class keyClass = job.getMapOutputKeyClass();
    Class valueClass = job.getMapOutputValueClass();
    // Get the group comparator
    RawComparator comparator = job.getOutputValueGroupingComparator();

    if (useNewApi) {
      runNewReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass);
    } else {
      runOldReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass);
    }

    shuffleConsumerPlugin.close();
    done(umbilical, reporter);
  }
Copy the code

First, the shuffle process pulls the same key into a partition, pulls the data, and finally returns the iterator rIter. Because of big data processing, data cannot be loaded into memory at once, so iterators are more appropriate to read data from disk into memory one at a time.

Then the second step, get grouped comparator RawComparator comparator = job. GetOutputValueGroupingComparator ().

  public RawComparator getOutputValueGroupingComparator(a) {
    Class<? extends RawComparator> theClass = getClass(
      JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
    if (theClass == null) {
      return getOutputKeyComparator();
    }
    
    return ReflectionUtils.newInstance(theClass, this);
  }

  public RawComparator getOutputKeyComparator(a) {
    Class<? extends RawComparator> theClass = getClass(
      JobContext.KEY_COMPARATOR, null, RawComparator.class);
    if(theClass ! =null)
      return ReflectionUtils.newInstance(theClass, this);
    // getMapOutputKeyClass is guaranteed to use the key's own comparator
    return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
  }
Copy the code

The getOutputKeyComparator here is also encountered in MapTask. This method first gets a user-defined group collator, calls the sort comparator if the group collator is empty, or uses the key’s own comparator if the sort comparator is not set either.

The group collator is used to compare keys and semantically returns a Boolean value. A sort comparator usually returns minus 1, 0, 1, which is semantically less than, equal to, and greater than. In fact, the comparator returns an int and directly compares to 0. The following code will be encountered later.

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);

nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 
                                currentRawKey.getLength(),
                                nextKey.getData(),
                                nextKey.getPosition(),
                                nextKey.getLength() - nextKey.getPosition()
                                    ) == 0;
Copy the code

Then, in the third step, execute the Reduce method. Let’s look at the runNewReducer method.

  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runNewReducer(JobConf job,
                     final TaskUmbilicalProtocol umbilical,
                     final TaskReporter reporter,
                     RawKeyValueIterator rIter,
                     RawComparator<INKEY> comparator,
                     Class<INKEY> keyClass,
                     Class<INVALUE> valueClass
                     ) throws IOException,InterruptedException, 
                              ClassNotFoundException {
    // wrap value iterator to report progress.
    final RawKeyValueIterator rawIter = rIter;
    rIter = new RawKeyValueIterator() {
      public void close(a) throws IOException {
        rawIter.close();
      }
      public DataInputBuffer getKey(a) throws IOException {
        return rawIter.getKey();
      }
      public Progress getProgress(a) {
        return rawIter.getProgress();
      }
      public DataInputBuffer getValue(a) throws IOException {
        return rawIter.getValue();
      }
      public boolean next(a) throws IOException {
        boolean ret = rawIter.next();
        reporter.setProgress(rawIter.getProgress().getProgress());
        returnret; }};// make a task context so we can get the classes
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
          getTaskID(), reporter);
    // make a reducer
    org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
      (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
    // Write the Writer used by reduce records
    org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = 
      new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext);
    job.setBoolean("mapred.skip.on", isSkipping());
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
    // Create reduce context and set some parameters of reduce runtime.
    org.apache.hadoop.mapreduce.Reducer.Context 
         reducerContext = createReduceContext(reducer, job, getTaskID(),
                                               rIter, reduceInputKeyCounter, 
                                               reduceInputValueCounter, 
                                               trackedRW,
                                               committer,
                                               reporter, comparator, keyClass,
                                               valueClass);
    try {
      reducer.run(reducerContext);
    } finally{ trackedRW.close(reducerContext); }}Copy the code

CreateReduceContext sets the reduce runtime parameters and wraps an iterator, the rIter, that can manipulate the data directly. The object is then passed to reducer.run(reducerContext).

  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKey()) {
        reduce(context.getCurrentKey(), context.getValues(), context);
        // If a back up store is used, reset it
        Iterator<VALUEIN> iter = context.getValues().iterator();
        if(iter instanceofReduceContext.ValueIterator) { ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore(); }}}finally{ cleanup(context); }}Copy the code

Unlike context.nextkeyValue (), which determines whether the next set of data is available, the Reducer context.nextkey () task determines whether the next set of data is available.

  public boolean nextKey(a) throws IOException,InterruptedException {
    while (hasMore && nextKeyIsSame) { // nextKeyIsSame defaults to false
      nextKeyValue();
    }
    if (hasMore) {
      if(inputKeyCounter ! =null) {
        inputKeyCounter.increment(1);
      }
      return nextKeyValue();
    } else {
      return false; }}Copy the code

It first determines if there is a next piece of data, and determines if there is a uniform group key, and if so, it enters the loop. But the nextKeyValue method will eventually be executed whenever there is data.

  public boolean nextKeyValue(a) throws IOException, InterruptedException {
    // Return false if there is no data
    if(! hasMore) { key =null;
      value = null;
      return false;
    }
    // Determine whether it is the first data in a set of keysfirstValue = ! nextKeyIsSame;/ / get the key
    DataInputBuffer nextKey = input.getKey();
    currentRawKey.set(nextKey.getData(), nextKey.getPosition(), 
                      nextKey.getLength() - nextKey.getPosition());
    buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
    key = keyDeserializer.deserialize(key);
    / / get the value
    DataInputBuffer nextVal = input.getValue();
    buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength()
        - nextVal.getPosition());
    value = valueDeserializer.deserialize(value);

    currentKeyLength = nextKey.getLength() - nextKey.getPosition();
    currentValueLength = nextVal.getLength() - nextVal.getPosition();

    if (isMarked) {
      backupStore.write(nextKey, nextVal);
    }

    // After reading the current kv, continue to read the next record, return whether there is a next record
    hasMore = input.next();
    if (hasMore) {
      // Record the key of the next record
      nextKey = input.getKey();
      // Compare whether the next record is the same set of keys and update nextKeyIsSame
      nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 
                                     currentRawKey.getLength(),
                                     nextKey.getData(),
                                     nextKey.getPosition(),
                                     nextKey.getLength() - nextKey.getPosition()
                                         ) == 0;
    } else {
      nextKeyIsSame = false;
    }
    inputValueCounter.increment(1);
    return true;
  }
Copy the code

After reading and writing data, this method determines whether there is a next piece of data. If there is, it determines whether the next piece of data is the same group of keys as the current data. It updates hasNore and nextKeyIsSame to continue the judgment in nextKey.

Next, look at the Reduce method in Reducer, where the context.getvalues () parameter returns an Iterable, and the corresponding iterator is ValueIterator.

  public 
  Iterable<VALUEIN> getValues(a) throws IOException, InterruptedException {
    return iterable;
  }

  protected class ValueIterable implements 可迭代<VALUEIN> {
    private ValueIterator iterator = new ValueIterator();
    @Override
    public Iterator<VALUEIN> iterator(a) {
      returniterator; }}@Override
    public boolean hasNext(a) {
      try {
        if (inReset && backupStore.hasNext()) {
          return true; }}catch (Exception e) {
        e.printStackTrace();
        throw new RuntimeException("hasNext failed", e);
      }
      return firstValue || nextKeyIsSame;
    }

    @Override
    public VALUEIN next(a) {
      if (inReset) {
        try {
          if (backupStore.hasNext()) {
            backupStore.next();
            DataInputBuffer next = backupStore.nextValue();
            buffer.reset(next.getData(), next.getPosition(), next.getLength()
                - next.getPosition());
            value = valueDeserializer.deserialize(value);
            return value;
          } else {
            inReset = false;
            backupStore.exitResetMode();
            if (clearMarkFlag) {
              clearMarkFlag = false;
              isMarked = false; }}}catch (IOException e) {
          e.printStackTrace();
          throw new RuntimeException("next value iterator failed", e); }}// if this is the first record, we don't need to advance
      if (firstValue) {
        firstValue = false;
        return value;
      }
      // if this isn't the first record and the next key is different, they
      // can't advance it here.
      if(! nextKeyIsSame) {throw new NoSuchElementException("iterate past last value");
      }
      // otherwise, go to the next key/value pair
      try {
        nextKeyValue();
        return value;
      } catch (IOException ie) {
        throw new RuntimeException("next value iterator failed", ie);
      } catch (InterruptedException ie) {
        // this is bad, but we can't modify the exception list of java.util
        throw new RuntimeException("next value iterator interrupted", ie); }}Copy the code

The hasNext method believes that as long as it is the first data in the same group or the next data in the same group as this data, the next data exists.

The next method returns value if it is the first data in the group. Otherwise, you need to execute the nextKeyValue method, that is, use the input of ReduceContextImpl to get the next piece of data and finally return value.

To summarize

A quick rundown of the ReduceTask workflow:

  • ReduceTaskWrap the pulled data into an iterator
  • reduceWhen the method is called, thevaluesDoes not load the data into memory
  • IteratorhasNextMethod to determine whether it is the first piece of data in a group, or whether the next piece of data is in the same group (nextKeyIsSame)
  • nextMethod to place the first piece of data in a setvalueReturn it directly, or take the actual iterator to get the record and update itnextKeyIsSame

As you can see, in order to avoid the OOM problems caused by the large amount of data, the iterator pattern is fully utilized. Coupled with the fact that MapTask already sorts the data, iterators can process all the data linearly in a single I/O.