This article mainly combs and analyzes the introduction of Shuffle based on the official website, and makes reference to part of the following materials for understanding. Each sentence on the English official website should be carefully reviewed.

1, Shuffle operations

Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark’s mechanism for Re-distributing data so that it’s grouped across partitions. This task is often used to imitate data across executors and machines, making the shuffle a complex and costly operation.

2, the Background,

To understand what happens during the shuffle we can consider the example of the reduceByKey operation. The reduceByKey operation generates a new RDD where all values for a single key are combined into a tuple – the key and the result of Subexecuting a reduce function against all values associated with that key. ReduceByKey will create a new RDD. The challenge is that not all values for a single key are grouped into a tuple — The result of The key and The reduce function performed for all values associated with that key On the same partition, or even the same machine, but they must be co-located to compute the result. Not all values of a single key need to be in the same partition, or even on the same machine, but they must be in the same place to compute results.)

In Spark, data is generally not distributed across partitions to be in the necessary place for a specific operation. During computations, a single task will operate on a single partition – thus, to organize all the data for a single reduceByKey reduce task to execute, Spark needs to perform an all-to-all operation. It must read from all partitions to find all the values for all keys, and then bring together values across partitions to compute the final result for each key – this is called the (In Spark, data is usually not distributed across partitions to the location required for a particular operation. During computation, a single task will operate on a single partition — so to organize all the data performed by a single reduceByKey Reduce task, Spark needs to perform an all-to-all operation. It must read the values of all keys from all partitions and then put the values of each partition together to compute the final result of each key — this is called shuffle)

Although the set of elements in each partition of newly shuffled data will be deterministic, and so is the ordering of partitions themselves, The ordering of these elements is not. If one desires predictably ordered data following shuffle then it’s possible to Use: although the set of elements in each partition of the newly shuffled data is determined, as is the ordering of the partition itself, the ordering of these elements is not determined. It can be used if one wants predictable ordered data after shuffling.)

  • mapPartitions to sort each partition using, for example, .sorted
  • repartitionAndSortWithinPartitions to efficiently sort partitions while simultaneously repartitioning
  • sortBy to make a globally ordered RDD

Shuffle operations are mainly repartition and merge operators:

Operations which can cause a shuffle include repartition operations like repartition and coalesce, (Operations that may cause shuffle include repartitioning operations, ‘ByKey operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join.

3, the Performance Impact

The Shuffle is an expensive operation since it involves disk I/O, data serialization, and network I/O. As it involves disk INPUT/output, data serialization and network input/output) To organize Data for the shuffle, Spark generates sets of tasks – map tasks to organize the data, And a set of reduce tasks to aggregate it. (To organize data for shuffle and generate task sets for Spark, map tasks are used to organize data. This nomenclature comes from MapReduce and does not directly relate to Spark’s map and Reduce (This term comes from MapReduce, but is not directly related to Spark’s Map and Reduce operations)

On the inside, results from individual map tasks are kept in memory until they can’t fit. Then, these are sorted based on the target partition and written to a single file. On the reduce side, Tasks read the relevant sorted blocks. (These partitions are then sorted according to the target partitions and written to a single file. On the Reduce side, the task reads the relevant sorted blocks.

Certain shuffle operations can consume significant amounts of heap memory since they employ in-memory data structures to Organize records before or after transferring them. (Certain shuffle operations can consume a lot of heap memory, Specifically, reduceByKey and aggregateByKey create these structures on the map side because they use in-memory data structures to organize records before and after transmission. And ‘ByKey operations generate these on the reduce side (specifically, reduceByKey and aggregateByKey create these structures on the Map side, The ByKey operation generates these structures on the Reduce side. . When data does not fit in memory Spark will spill these tables to disk, Incurring the additional overhead of disk I/O and increased garbage collection. (When data does not fit in memory, Spark outputs these tables to disk, This incurs additional overhead for disk I/O and increases garbage collection.)

Shuffle also generates a large number of intermediate files on disk. Shuffle also generates a large number of intermediate files on disk. These files are preserved until the corresponding RDDs are no longer used and are garbage collected. (Starting from Spark 1.3, These files will be retained until the corresponding RDDs are no longer used and garbage collection is performed Re-computed. (This is done so that no shuffle files need to be recreated when inheritance is recalculated) Garbage collection may happen only after a long period of time, If the application retains references to these RDDs or if GC does not kick in frequently. Or if GC is not started very often, garbage collection may occur only after a long time.) This means that long-running Spark jobs may consume a large amount of disk space. The long-running Spark jobs may consume a large amount of disk space temporary storage directory is specified by the spark.local.dir configuration parameter when configuring the Spark Context. (The temporary storage directory is specified by spark.local. Configure the dir configuration parameter of SparkContext.

Shuffle behavior can be tuned by adjusting a variety of configuration parameters Shuffle Behavior section within the Spark Configuration Guide.

4. Shuffle Introduction

Spark divides a Job into multiple stages during DAG scheduling. The upstream Stage performs map work and the downstream Stage performs Reduce work. In essence, Spark is a MapReduce computing framework. Shuffle is a bridge between Map and Reduce. It maps map output to Reduce input. This process involves serialization and deserialization, cross-node network I/O, and disk read/write I/O. Understanding the Spark Shuffle principle helps optimize the Spark application.

5. Basic principles and features of Spark Shuffle

Similar to the MapReduce computing framework, Spark’s Shuffle implementation is roughly shown in the following figure. In the DAG phase, stages are divided into Shuffle boundaries. The upstream stages perform map tasks. Each copy corresponds to each partition of the downstream stage and is temporarily written to the disk, a process called shuffle write. Downstream stages perform Reduce tasks. Each Reduce task fetches the result data of the specified partition of all Map tasks in upstream stages over the network, a process called Shuffle Read, and finally completes the Reduce service logic. Here’s an example:

  • If there are 100 Map tasks in the upstream stage and 1000 Reduce tasks in the downstream stage, then each map task in the 100 map tasks will get 1000 pieces of data. Each Reduce task in 1000 Reduce tasks pulls the data corresponding to 100 Upstream Map tasks. That is, the first Reduce task pulls the first result data of all Map tasks, and so on.

In the MAP phase, in addition to map service logic, there is a Shuffle write process, which involves time-consuming operations such as serialization and disk I/O. In the Reduce phase, in addition to the service logic of Reduce, there is also the shuffle Read process, which involves time-consuming operations such as network IO and deserialization. Therefore, the whole shuffle process is extremely expensive. Spark has also made a lot of improvements in the implementation of Shuffle, and the implementation of Spark Shuffle is gradually improved with the release of iterations.

6. Explain the Shuffle principle

6.1 Mechanism of Shuffle

Overview: Shuffle describes the process during which data is output from a Map task to input from a Reduce task. In distributed mode, reduce tasks need to pull map task results from other nodes across nodes. This process will cause network resource consumption and memory, disk IO consumption.

6.2. Shuffle Principle of MapReduce

  • Map Task operation

Each map task has a memory buffer (100MB by default) to store the map output results. When the buffer is about to be full, you need to save the data in the buffer to a temporary file. After the entire map task is complete, you can merge all the temporary files generated by the map task. Generate the final formal output file and wait for the Reduce Task to pull the data.

Spill: The process by which data is written from memory to disk is called Spill. Percent (default: 0.8). When the buffer reaches the threshold, the Map task can continue to write to the remaining memory, and the overflow thread locks the used memory. The key(serialized bytes) is sorted first. If Combiner is configured in the client program, local aggregation is performed during write overflows.

Merge: Each overwrite generates a temporary file that is merged into a single file when the Map task actually completes. This process is called Merge.

  • Reduce Task operations

When all map tasks on a TaskTracker are completed, reduce tasks on the corresponding node start. In short, this stage is to continuously Fetcher the final result of each map task on the node. Then continuously merge to form the input file for the Reduce Task.

Copy process: The Reduce process starts some data Copy threads (Fetcher) to fetch the map output file of TaskTracker through HTTP

The Merge process: Copy data is first put into the memory buffer (based on the JVM’s heap size setting). If the memory buffer is insufficient, map Task spill (sort default, Combine optional) occurs, and map Task merge occurs when multiple write overflow files.

Here are the key words of MapReduce:

Storage related are: memory buffer, default size, write overflow threshold

Main processes: spill, sort, combine, Merge, Copy or Fetch

Related parameters: Default memory buffer size, JVM heap size, spill. Percent

  • About sorting methods:

In Map stage, k-V overwrite adopts fast row. The merge of overflow files uses merge. In the Reduce phase, files obtained from Map are merged using shuffle. The final stage uses heap row as the final merge process.

7. Shuffle operator

7.1 De-weighting:

def distinct(a)def distinct(numPartitions: Int)
Copy the code

7.2, aggregation,

def reduceByKey(func: (V.V) = >V, numPartitions: Int) :RDD[(K.V)]
def reduceByKey(partitioner: Partitioner, func: (V.V) = >V) :RDD[(K.V)]
def groupBy[K](f: T= >K, p: Partitioner) :RDD[(K.可迭代[V]]def groupByKey(partitioner: Partitioner) :RDD[(K.可迭代[V]]def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner) :RDD[(K.U)]
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int) :RDD[(K.U)]
def combineByKey[C](createCombiner: V= >C, mergeValue: (C.V) = >C, mergeCombiners: (C.C) = >C) :RDD[(K.C)]
def combineByKey[C](createCombiner: V= >C, mergeValue: (C.V) = >C, mergeCombiners: (C.C) = >C, numPartitions: Int) :RDD[(K.C)]
def combineByKey[C](createCombiner: V= >C, mergeValue: (C.V) = >C, mergeCombiners: (C.C) = >C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null) :RDD[(K.C)]
Copy the code

7.3, sorting,

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K.V)]
def sortBy[K](f: (T) = >K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K) :RDD[T]
Copy the code

7.4. Repartition

def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
def repartition(numPartitions: Int) (implicit ord: Ordering[T] = null)
Copy the code

7.5. Set or table operations

def intersection(other: RDD[T) :RDD[T]
def intersection(other: RDD[T], partitioner: Partitioner) (implicit ord: Ordering[T] = null) :RDD[T]
def intersection(other: RDD[T], numPartitions: Int) :RDD[T]
def subtract(other: RDD[T], numPartitions: Int) :RDD[T]
def subtract(other: RDD[T], p: Partitioner) (implicit ord: Ordering[T] = null) :RDD[T]
def subtractByKey[W: ClassTag](other: RDD[(K.W)]) :RDD[(K.V)]
def subtractByKey[W: ClassTag](other: RDD[(K.W)], numPartitions: Int) :RDD[(K.V)]
def subtractByKey[W: ClassTag](other: RDD[(K.W)], p: Partitioner) :RDD[(K.V)]
def join[W](other: RDD[(K.W)], partitioner: Partitioner) :RDD[(K, (V.W)))def join[W](other: RDD[(K.W)]) :RDD[(K, (V.W)))def join[W](other: RDD[(K.W)], numPartitions: Int) :RDD[(K, (V.W)))def leftOuterJoin[W](other: RDD[(K.W)]) :RDD[(K, (V.Option[W"))"Copy the code

8. Reference materials

Spark.apache.org/docs/latest…

www.slideshare.net/colorant/sp…

www.cnblogs.com/arachis/p/S…

Sharkdtu.com/posts/spark…