Shuffle tuning is used in the recent project. I found some learning resources and made them into notes to share with you

Question: What is a shuffle?

Answer: When each Spark job is started, the Driver process divides the Spark job code we wrote into several stages. Each stage executes a portion of the code and creates a batch of tasks for each stage. These tasks are then assigned to each Executor process. After all tasks of a stage have been executed, a large number of files are generated on each executor node. These files are written to disk by I/O (the intermediate results calculated by the stage while these files are stored), and the Driver schedules the next stage. The input data for the Task of the next stage is the intermediate result of the output of the previous stage. And so on and so forth, until the program is finished, and finally we get the result we want. Spark divides stages based on the Shuffle operator. If we implement a shuffle operator (such as groupByKey, countByKey, reduceByKey, Join, etc.) in our code, divide a stage boundary whenever we encounter this type of RDD operator.

Problem: Reduce OOM

Cause: The Reduce Task obtains data from the Map end, and the Reduce task pulls and aggregates data. The Reduce end has an aggregate memory (Executor Memory * 0.2), which is insufficient. Increase the proportion of memory used for Reduce aggregation operations. 2. Increase the size of Executor memory. To reduce the reduce task every time pull the amount of data, set the spak. Reducer. MaxSizeInFlight 24 m

Problem: Insufficient out-of-heap memory

Shuffle file cannot find or executor lost

Cause: 1. The Executor running the Map Task is out of memory, causing the Executor to hang up, and the BlockManager in the executor to hang up. As a result, the ConnectionManager cannot be used, so the connection cannot be established, and data cannot be pulled. 2. The executor has not been suspended, the connection between BlockManage failed (the executor running the Map Task is GC). The executor running the Map Task was collecting the shuffle file location from the MapOutputTracker in the Driver

Solution: 1. Increase Executor memory (i.e., in-heap memory), and request out-of-heap memory will also increase –executor-memory 5G 2. Increase the heap memory — the conf spark. Yarn. Executor. Memoryoverhead $2048 m – the conf spark. The executor. Memoryoverhead 2048 m (By default, the allocated out-of-heap memory is 10% of Executor memory. When processing big data, problems occur in this area, causing spark jobs to repeatedly crash and fail to run. At this point, the parameter will be adjusted to at least 1G (1024M), or even 2G, 4G).

Problem: Reduce the number of files

Each task in the first half of the shuffle will create a file corresponding to the number of tasks in the second half of the shuffle. Then each task will have a memory buffer and the values will be pooled using the HashMap

The default Shuffle flowchart

Tune related parameters

spark.shuffle.file.buffer

Default value:32k

Parameter Description: This parameter is used to set the buffer size of BufferedOutputStream of shuffle Write Task. Data is written to the buffer before being written to the disk file. When the buffer is full, data is overwritten to the disk.

Tuning advice: Increase the size of this parameter if the job has sufficient memory resources (e.g64K), reducing the number of disk file overwrite during shuffle Write, reducing the number of DISK I/O operations, and improving disk performance. In practice, it is found that reasonable adjustment of this parameter, the performance will have1% ~5A % increase.



spark.reducer.maxSizeInFlight

Default value:48m

Parameter Description: This parameter sets the size of the Shuffle Read task's buffer, which determines how much data can be pulled at a time.

Tuning advice: Increase the size of this parameter if the job has sufficient memory resources (e.g96M), thus reducing the number of pulling data, which can also reduce the number of network transmission, and thus improve performance. In practice, it is found that reasonable adjustment of this parameter, the performance will have1% ~5A % increase.



spark.shuffle.io.maxRetries

Default value:3

Parameter Description: Shuffle Read Task When the shuffle Write task attempts to pull its own data from the node where the shuffle Write task locates, if the pull fails due to a network exception, the system automatically tries again. This parameter represents the maximum number of retries that can be made. If the pull fails within the specified number of times, the job may fail.

Tuning advice: For jobs that involve particularly time-consuming shuffle operations, it is recommended to increase the maximum number of retries (e.g60To avoid pull failures due to JVM full GC or network instability. In practice, it is found that adjusting this parameter can greatly improve the stability of the shuffle process with a large amount of data (billions to tens of billions).



spark.shuffle.io.retryWait

Default value:5s

Parameter description: This parameter indicates the interval for pulling data in each retry. The default value is5S.

Tuning suggestion: It is recommended to increase the interval time (e.g60S) to increase the stability of shuffle operation.



spark.shuffle.memoryFraction

Default value:0.2

Parameter Description: This parameter represents the proportion of Executor memory allocated to the Shuffle Read Task for aggregation. The default value is20%.

Tuning advice: This parameter is explained in resource parameter tuning. If the memory is sufficient and persistent operations are rarely used, you are advised to increase this ratio to allocate more memory for shuffle Read aggregation operations to avoid frequent disk reads and writes due to insufficient memory. It is found in practice that the performance can be improved by adjusting this parameter reasonably10%.



spark.shuffle.manager

Default value: sort

Parameter Description: This parameter is used to set the ShuffleManager type. Spark1.5Later, there are three options: Hash, sort, and Tungsten-sort. HashShuffleManager is Spark1.2Previously the default option, but Spark1.2And later versions default to Sort Office Manager. Tungston-sort is similar to SORT, but uses the out-of-heap memory management mechanism of the Tungsten plan for more efficient memory usage.

Tuning suggestion: Since SortShuffleManager sorts data by default, use the default SortShuffleManager if your business logic needs this sorting mechanism. If your business logic does not need to sort data, it is recommended to use the bypass mechanism or the optimized HashShuffleManager to avoid sorting operations and provide better disk read and write performance. It is important to note that tungstro-sort should be used with caution, as a few bugs have been found.



spark.shuffle.sort.bypassMergeThreshold

Default value:200

Parameter Description: When ShuffleManager is SortShuffleManager, the number of Shuffle Read tasks is smaller than this threshold (the default value is200), the shuffle Write process does not perform sorting operations, but writes data in the way of the unoptimized HashShuffleManager. However, all temporary disk files generated by each task are merged into one file and a separate index file is created.

Tuning suggestion: When you use sort Manager, if you really do not need to sort, it is recommended to set this parameter to a larger value than the number of Shuffle Read tasks. In this case, the bypass mechanism is automatically enabled, and map-side is not sorted, reducing the performance overhead of sorting. However, a large number of disk files are still generated in this mode. Therefore, the Shuffle Write performance needs to be improved.



spark.shuffle.consolidateFiles

Default value:false

Parameter Description: This parameter is valid if HashShuffleManager is used. If set totrueThen consolidate is enabled to largely consolidate shuffle write output files. If a large number of Shuffle Read tasks are performed, this method greatly reduces disk I/O overhead and improves disk performance.

Tuning suggestions: If you really don't need SortShuffleManager's sorting mechanism, you can also use the bypass mechanism to manually specify spark.shffle.manager as hash, using HashShuffleManager, At the same time, consolidate is enabled. In practice, it is found that its performance is higher than SortShuffleManager with bypass enabled10% ~30%.

Copy the code

Refer to https://www.jianshu.com/p/069c37aad295

Wechat official account: Big Data Diary

The public number does not regularly share big data related articles and learning materials, welcome like-minded friends to exchange and learn together!