This series of blogs summarizes and shares examples drawn from real business environments, and provides practical guidance on Spark business applications. Stay tuned for this series of blogs. Copyright: This set of Spark business application belongs to the author (Qin Kaixin).

  • Qin Kaixin technology community – big data business combat series complete catalogue
  • Spark Business Application: In-depth analysis of Spark data skew case tests and tuning guidelines
  • Spark Business Application Deployment – In-depth analysis of Spark resource scheduling parameter tuning
  • Spark Business application Combat – In-depth analysis of Spark Shuffle process parameter optimization

1 Spark Optimizes resource configuration

./bin/spark-submit \ --master yarn-cluster \ --num-executors 100 \ --executor-memory 6G \ --executor-cores 4 \ - driver - the memory 1 g \ conf spark. The default. The parallelism = 1000 \ - conf spark. Storage. MemoryFraction = 0.5 \ - the conf The spark. Shuffle. MemoryFraction = 0.3 \Copy the code

2 Optimize the configuration of Spark running resources

  • spark.shuffle.file.buffer
  • Default value: 32K
  • Parameter Description: This parameter is used to set the buffer size of BufferedOutputStream of shuffle Write Task. Data is written to the buffer before being written to the disk file. When the buffer is full, data is overwritten to the disk.
  • Tuning suggestion: If the available memory resources of a job are sufficient, you can increase the value of this parameter (for example, 64 KB) to reduce the number of disk file overwrites during shuffle Write, reduce the number of DISK I/O operations, and improve disk performance. It is found in practice that the performance can be improved by 1%~5% when the parameters are properly adjusted.

  • spark.reducer.maxSizeInFlight
  • Default value: 48 MB
  • Parameter Description: This parameter sets the size of the Shuffle Read task’s buffer, which determines how much data can be pulled at a time.
  • Tuning advice: If the job has sufficient memory resources, increase the size of this parameter appropriately (such as 96 MB) to reduce the number of pulls, thus reducing the number of network transfers, and thus improving performance. It is found in practice that the performance can be improved by 1%~5% when the parameters are properly adjusted.

  • spark.shuffle.io.maxRetries
  • Default value: 3
  • Parameter Description: Shuffle Read Task When the shuffle Write task attempts to pull its own data from the node where the shuffle Write task locates, if the pull fails due to a network exception, the system automatically tries again. This parameter represents the maximum number of retries that can be made. If the pull fails within the specified number of times, the job may fail.
  • Tuning advice: For jobs that involve particularly time-consuming shuffle operations, it is recommended to increase the maximum number of retries (say, 60) to avoid pull failures due to factors such as the JVM’s full GC or network instability. In practice, it is found that adjusting this parameter can greatly improve the stability of the shuffle process with a large amount of data (billions to tens of billions).

  • spark.shuffle.io.retryWait
  • Default value: 5s
  • Parameter Description: Shuffle Read Task When the shuffle Write task attempts to pull its own data from the node where the shuffle Write task locates. If the pull fails due to a network exception, the task automatically tries again. This parameter indicates the data pull interval of each retry.
  • Tuning suggestions: You are advised to increase the interval (such as 60s) to improve shuffle operation stability.

  • spark.shuffle.memoryFraction
  • Default value: 0.2
  • Parameter Description: This parameter represents the proportion of Executor memory allocated to the Shuffle Read Task for aggregation operations. The default value is 20%.
  • Tuning advice: This parameter is explained in resource parameter tuning. If the memory is sufficient and persistent operations are rarely used, you are advised to increase this ratio to allocate more memory for shuffle Read aggregation operations to avoid frequent disk reads and writes due to insufficient memory. It is found in practice that the performance can be improved by about 10% by adjusting this parameter properly.

  • spark.shuffle.manager
  • Default value: sort
  • Parameter Description: This parameter is used to set the ShuffleManager type. After Spark 1.5, there are three options: Hash, sort, and Tungsten-sort. HashShuffleManager is the default option prior to Spark 1.2, but Spark 1.2 and later versions default to SortShuffleManager. Tungston-sort is similar to SORT, but uses the out-of-heap memory management mechanism of the Tungsten plan for more efficient memory usage.
  • Tuning suggestion: Since SortShuffleManager sorts data by default, use the default SortShuffleManager if your business logic needs this sorting mechanism. If your business logic does not need to sort data, it is recommended to use the bypass mechanism or the optimized HashShuffleManager to avoid sorting operations and provide better disk read and write performance. It is important to note that tungstro-sort should be used with caution, as a few bugs have been found.

  • spark.shuffle.sort.bypassMergeThreshold
  • Default value: 200
  • Parameter Description: If ShuffleManager is SortShuffleManager and the number of Shuffle Read tasks is smaller than this threshold (the default value is 200), sorting operations are not performed during shuffle Write. Instead, the data is written directly to the non-optimized HashShuffleManager, but all temporary disk files generated by each task are merged into one file and a separate index file is created.
  • Tuning suggestion: When you use sort Manager, if you really do not need to sort, it is recommended to set this parameter to a larger value than the number of Shuffle Read tasks. In this case, the bypass mechanism is automatically enabled, and map-side is not sorted, reducing the performance overhead of sorting. However, a large number of disk files are still generated in this mode. Therefore, the Shuffle Write performance needs to be improved.

  • spark.shuffle.consolidateFiles
  • Default value: false
  • Parameter Description: This parameter is valid if HashShuffleManager is used. If the power is set to true, consolidate is enabled to largely consolidate shuffle write output files. If a large number of Shuffle Read tasks are performed, this method greatly reduces disk I/O overhead and improves disk performance.
  • Tuning suggestions: If you really don’t need SortShuffleManager’s sorting mechanism, you can also use the bypass mechanism to manually specify spark.shffle.manager as hash, using HashShuffleManager, At the same time, consolidate is enabled. In practice, it is found that its performance is 10%~30% higher than SortShuffleManager with bypass enabled.

3 summary

Qin Kaixin in Shenzhen