Source code analysis

Distributed computing seeks to:

  • Computing moves to data
  • Parallelism, divide and conquer
  • Data localization read

Client

No calculation occurs.

It supports the movement of computation to data, and parallelism of computation.

The most important thing to do is: decouple storage and computation, that is, split slices of the data to be sliced, (split == map parallelism).

Split by default is the same as the number of Block blocks, the purpose is to calculate the movement to data, several blocks distributed in several places, a few maps, so there is no need to move a large number of data, but only need to distribute Jar packages to nodes where each Block is for execution.

Users can also customize Split:

When Split is greater than Block, part of the data will be moved to the calculation, but from the original stand-alone Map, all the data will be pulled to a node. Considering, this is actually the calculation to the data movement.

When Split is smaller than Block, it is generally used for CPU intensive calculations, and some data is moved towards the calculation.

MapTask

How to solve HDFS, Block just cut a word?

That is, the blue part is Block01, and the red part is Block02.

Hello World!!!

Hello World!!!

Hello World!!!

The World word in the second line is cut, but the Hadoop framework still returns a correct value for mapreduce-Wordcount calculations. How does the Hadoop framework achieve this?

Read the source code to find. When MapReduce uses LineRecordReader, it will determine that if the file currently starting is not offset zero (that is, not the first slice file), it will read a blank line, moving the offset to the beginning of the second line of the Block. In this example, the offset is at the beginning of the third line of the full text, and when the Map executes, the framework moves the unprocessed data in the second Block to the location of the first Block for calculation, thus solving the problem of words being cut apart by the Block.


Input -> map -> output

  • Input (split+ Format) : From our Input formatting class gives us the record reader object that is actually returned.

    TextInputFormat -> LineRecordReader

    Split: file, offset, length

    The init () :

    • in = fs.open(file).seek(offset)
    • With the exception of the first Map, all subsequent maps yield the first line and read from the second line of Split. In other words, the previous Map reads an extra row to compensate for HDFS slicing.

    NextKeyValue () :

    1. Read a line of dataThe Key, the Valuedata
    2. Returns a Boolean value

    getCurrentKey()

    getCurrentValue()

  • The output:

    • Determine whether Reduce is required?

      • If yes, how many partitions does Reduce require? If the number of partitions is greater than one, the Hash partition is used by default. You can also customize the Hash partition.
      • If you don’t, just write it out.
    • Write is to write (K, V, P) formats into the MapOutPutBuffer through writer.

    • MapOutPutBuffer:

      • * :

        • After the map() method is executed, it calculates the corresponding partion value, and then writes it to the buffer by means of triples (K, V, P) through writer.

        • Buffer:

          1. The buffer stores corresponding K and V data and their metadata (indexes, with a constant size of 16 bytes, storing P values, KeyStart, ValueStart and ValueLength). In this way, we can randomly extract the keys of corresponding data through neatly sorted index data.

          2. The ring buffer, which is really a ring logically, has a linear memory structure.

            Principle:

            By dividing a boundary in the buffer and storing KV data and index data on both sides of the boundary, when the buffer occupancy rate reaches 80%, the buffer that already has data is locked for quick sort and overwrite. At the same time, the circular buffer on the side without data is again divided into a boundary, and again the data is stored on both sides. If a write overflow ends before the buffer is full, it can proceed without affecting the thread that wrote the data.

            Advantages: Better use of memory space, improve efficiency.

          3. Before overwriting data, sort data by partition value and then by Key value. The sorted exchange data is index data instead of KV data. The reason is that the size of KV data is uneven. Once the size is inconsistent, data cannot be exchanged, while KV data is constant 16 bytes and can be exchanged. When writing, the corresponding metadata is extracted according to the index, and then the corresponding KV data can be removed according to the pointed memory location to complete the sorted overwrite.

          4. Combiner optimization (reduce I/O), Combiner is equivalent to merging in Map ahead of time, after sort, before overwrite. When multiple small files are merged into one large file by merge sort, if the number of small files is greater than or equal to the minSpillsForCombine (default = 3) value, a merge is also performed.

            Tips: Why does each Map require multiple small files to be merged into one large file? Disk I/O is a performance bottleneck for MapReduce. Merging files avoids random disk reads and speeds up I/O speeds.

          5. Note that the Combine must be idempotent, that is, the process of combining data should not error the final result.

      • Init() :

        Spiller: 0.8 Buffer overwrite percentage. The default value is 80%

        Sortmb: 100 buffer size

        Sorter: QuickSort sorting algorithm

        Comparator: job. GetOutputKeyComparator ()

        • The user – defined comparator is preferred
        • The default is to take the comparator of the Key type itself
    • Combiner: Used to merge K-V data of the same Key in advance.

      minSpillsForCombine = 3

    • SpillThread

      SortAndSpill () is used to sortAndSpill blood.

      And determine whether the merger is required. If the merger is required, proceed.

ReduceTask

input -> reduce -> output

Riter-input: it will go to all the files generated on the Map end, pull the corresponding data to the Reduce end, sort the data into a large file, and encapsulate it into an iterator.

When the reduce() method is run, the same key is loaded by passing a “false” iterator values.

The values iterator uses the hasNext() method to determine whether there is another set of data with the same Key. In other words, the nextKeyIsSame value is used to determine whether there is another set of data with the same Key.

The next() method can indirectly call rIter’s nextKeyValue() to pull out a piece of data, read another piece of data, judge whether the next data is the same as the Key of the current data, and update the nextKeyIsSame value.

Group collator: Divides K-V data in large files into a group for Reduce operation. If the user does not specify a group, the group is performed by Key by default.

Why use iterators?

Avoid OOM: The big data file is too large to be loaded into the memory.

Reduce IO costs: There may be multiple different keys in the same group, and these different keys need to be entered into different reduce() methods. Only the real iterator is required to read from the beginning, and every time a new set of Key data needs to be processed, only the false iterator and the real iterator need to read the data to complete the ReduceTask operation. The whole ReduceTask requires only one I/O performed by the true iterator and indirect invocation by the false iterator.