I didn’t pay too much attention to the concept of Shuffle when I was studying it, so I thought I was missing something. I read some posts and found out that Shuffle is a long process from the end of map() to the end of reduce()…. BB: In essence, the process contains many links, do not know why they are referred to as Shuffle……

1

See the meaning of the picture — directly above:

Points to be clear:

  • When a ring buffer triggers a write overrun, the sorting is done first by the partition of each key-value pair, and then by the key
  • Each write actually includes two files, one is the index file formed by the META data write, and the other is the sorted data file
  • A sort occurs during overwrite, which is the quicksort algorithm; In the final merge into a whole file, because the small files have been ordered, directly merge sort can be
  • The Reducer reads data from the final file file.out of the Mapper node, and only reads the data of the corresponding partition according to the index file

2. Mapper’s partition

The function Of partitioning is to divide the output results Of the Mapper stage into different regions and send them to different reducetasks, so the number Of Partitions and the parallelism Of Reduce are mutually determined, that is, Num Of Partitions = Num Of ReduceTask

Set the number of reducetAsks: Job. SetNumReducetAsks (n)

Partition mode:

  • Default: hash(key) % Num of ReduceTask — Modulo of key-value hash
  • Custom Partition Class: Defines a class that inherits the Partitioner class and overwrites the getPartition() method

Note:

  • When the number of reducetasks is not set, the default allocation of MR is only one ReduceTask, that is, no matter how many sections the custom partitioning method needs to divide, the final result will only enter this one Reducer and finally generate a result file.
  • If the number of reduceTasks is set to be greater than 1, but less than the number of custom partitions, running the MR program will report an error, because it will produce some situations where the result data of the Mapper is not fetched by the Reducer
  • If the number of partitions set is < the number of reducetasks, the program will execute normally, and the result file equal to the number of reducetasks will be generated at last, but there are empty files (number of reducetasks – number of partitions), because the Reducer of this part has not been input

3. The order of Mapper

  • Relatively simple, overwrite quick sort, merge sort when merge; The sorting criteria is lexicographical sorting of key
  • Customizable ordering (used when you customize Bean objects) requires that the custom Bean class implement the WriteableParable interface and override the CompareTo method, which is essentially the same as Comparable in Java

4. Read the Reducer



Points to be clear:

  • A reduceTask reads only the data within the same partition of each Mapper result
  • Copy to the Reducer node is first stored in memory, and if memory is insufficient, it is scrubbed to disk
  • After all the data is copied, Merge && Sort the memory and disk (if there is a write). This Sort is also by default lexicographical Sort based on the Key. The purpose is to arrange the KV pairs with the same Key in the data together to facilitate the call of reduce method. [Reduce method is used to encapsulate all values of the same key as Iterator iterator objects before a key is called. For a key, reduce method is called once.]

5. Reducer ordering

The sorting on the Reduce side is also applied to the scenarios that use custom Bean objects, and the Reducer judges the keys specified by us as the same keys through the custom sorting method (it will be judged as different keys in the default Reduce sorting).

Attention! Attention! Attention! There is a pit here!!

  • The custom sorting on the Reducer side requires a custom sorting class that inherits WriteableParator and overwrites the compare method, and associates our custom sorting class in the Driver
  • The custom ordering on the Mapper side requires that the Bean object inherit the WritableParable class and overwrite the method compareTo when we define the Bean object