When using Spark to perform calculations, we often encounter jobs that are Out Of Memory(OOM), and most Of them occur in Shuffle. In Spark Shuffle, which specific areas use more memory and may result in OOM? In this paper, the knowledge related to Spark memory management and Shuffle memory usage is discussed based on the above problems. Then, the possible causes of OOM in Spark Shuffle are briefly analyzed.

Spark memory management and consumption model

Before analyzing the Memory usage of Spark Shuffle. Let’s start with the following question: When a Spark subtask is assigned to run on an Executor, what is the general model of how Spark manages and consumes memory? (Note: Since OOM is primarily on the Executor side, the following discussion will focus on memory management and usage on the Executor side.)

1. In Spark, MemoryConsumer is an abstract class to represent consumers who need to use memory. This class defines methods or interfaces that allocate, release, and Spill memory data to disk. Specific consumers can inherit MemoryConsumer to implement specific behaviors. Therefore, there are different types of consumers during the execution of Spark Task. For example, ExternalAppendOnlyMap and ExternalSorter are used in Spark Shuffle. 2. The MemoryConsumer assigns the task to the TaskMemoryManager to request and free memory. When a Spark Task is assigned to an Executor, a TaskMemoryManager is created. Before the TaskMemoryManager can allocate memory, it needs to apply to the MemoryManager first, and then the TaskMemoryManager performs the actual memory allocation with the help of MemoryAllocator. MemoryManager in Executor centrally manages memory usage. Since each TaskMemoryManager requests the MemoryManager first before performing the actual memory allocation. MemoryManager therefore has a global understanding of how memory is being used by the current process. MemoryManager, TaskMemoryManager, and MemoryConsumer are shown in the following figure. In general, one MemoryManager corresponds to at least one TaskMemoryManager (specified by the executor-core argument), A TaskMemoryManager corresponds to multiple MemoryConsumers, depending on the task.





2. Spark Shuffle process

The Spark Shuffle process is divided into two phases: Shuffle Write and Shuffle Read. The Write phase generally goes through sorting (the minimum requirement is to sort by partition), possible combining and merging (in the case of multiple files on spill disks), and eventually each Write Task generates data and indexes two files. Data files are stored according to partitions, that is, the data of the same partition is contiguous in the file, and the index file records the start and end positions of each partition in the file. For Shuffle Read, the data of a given partition may be obtained from each Write task node over the network, that is, a certain continuous area in the data file, and then sorted and merged to form the calculation result.

2.1 Analysis in Shuffle Write Phase

2.1.1 BypassMergeSortShuffleWriter analysis

Roughly for BypassMergeSortShuffleWriter’s implementation, the implementation process is first for each partition file, create a temporary partition data is written to the corresponding partition file, in the end all partitions file merging into a data file, and create an index file. Because this process doesn’t do sorting, combine (if you need to combine can’t use this implementation), and other operations, so for BypassMergeSortShuffleWriter, in general it is not how to use memory.

2.1.2 SortShuffleWriter analysis

SortShuffleWriter is the most general implementation and the most frequently used daily. SortShuffleWriter delegates ExternalSorter to insert, sort, Merge, Combine, and eventually write data and index files. ExternalSorter implements the MemoryConsumer interface mentioned earlier. 1. For data writes, the data is inserted into the PartitionedAppendOnlyMap or PartitionedPairBuffer array, depending on whether the Combine is needed. Every once in a while, when applied to MemoryManager for less than enough memory, or the amount of data than the spark. Shuffle. Spill. NumElementsForceSpillThreshold this threshold (default is a maximum of Long, Does not function), the memory data is spilled to the file. The PartitionedAppendOnlyMap or PartitionedPairBuffer is memory – consuming. 2, whether PartitionedAppendOnlyMap or PartitionedPairBuffer, the sorting algorithm used is TimSort. The temporary extra space used when using this algorithm is normally small, but in the worst case n / 2, where n represents the length of the array to be sorted (see TimSort implementation for details). 3. When data is inserted, it will Spill to disk because it does not have enough memory. Before the final sort result is written to the data file, You need to merge the PartitionedAppendOnlyMap or PartitionedPairBuffer in memory with the SpillFiles that have been spilled to disk. The following figure shows the general process of Merge.








2.1.3 UnsafeShuffleWriter

UnsafeShuffleWriter is an optimization of SortShuffleWriter, and is generally similar to SortShuffleWriter, which will not be described here. From a memory usage perspective, there are two main differences: On the one hand, the PartitionedAppendOnlyMap or PartitionedPairBuffer of SortShuffleWriter stores key values or specific types of values, namely Java objects, which are deserialized data. In ShuffleExternalSorter of UnsafeShuffleWriter, data is serialized and stored in the actual Page, and additional length information is written during data writing. In general, the data size after serialization is much smaller than the data before serialization. On the other hand, an additional storage record (LongArray) is required in UnsafeShuffleWriter, which holds partition information and the actual pointer to the serialized data (encoded Page num and Offset). The overhead of this part of storage in UnsafeShuffleWriter is extra compared to SortShuffleWriter.

2.2 Shuffle Read Phase Analysis

Spark Shuffle Read mainly goes through the process of obtaining data, serializing streams, adding indicator statistics, calculating possible Aggregation, and sorting. The general process is shown below.











Possibility analysis of Spark Shuffle OOM

Based on memory usage, Spark memory management and memory usage during Shuffle are analyzed in detail. The main points are as follows: 1. Pay attention to the concurrency of Executor tasks. Multiple tasks running at the same time share the memory of Executor, reducing the available memory for a single Task. 2. Whether in Map or Reduce, inserting data into memory, sorting, and merging all take up more memory. Because Spill is enabled, data skew does not cause OOM in theory. However, the allocation and release of objects in the heap are managed by the JVM, and Spark obtains the used memory by sampling. Therefore, the sampling may be inaccurate and the memory cannot Spill in time, resulting in OOM. 3. When Reduce obtains data, the data in a single Block may be very large due to data skew. By default, sufficient memory is required to save the data in a single Block. Therefore, it is highly likely to cause OOM due to data skew. You can set the spark maxRemoteBlockSizeFetchToMem parameters, set this parameter, beyond a certain threshold, the Spill will automatically data to disk, which can avoid OOM for data skew. This was also verified in our production environment, where OOM usage was significantly reduced after setting this parameter to a reasonable threshold. 4, after the Reduce access to data, will default to extract data flow calibration (parameter spark. Shuffle. DetectCorrupt). As mentioned in the code comments, since this part does not Spill to disk operations, there is also a large capability that will result in OOM. In our production environment, we also encountered OOM caused by inspection.

Four, summary

This article mainly focuses on the memory usage, makes a detailed review of the Spark Shuffle process, and analyzes some possible situations that may cause OOM and some problems encountered in the production environment. This article is mainly based on the author’s understanding of Spark source code and summary of OOM cases encountered in the actual production process. Due to experience and other reasons, it is inevitable to have some omissions or bias. If you have any questions, please feel free to contact us.