This article has participated in the activity of “New person creation Ceremony”, and started the road of digging gold creation together.

Today, we will introduce Spark’s memory model. Don’t miss it

Unlike the Mapreduce engine, which frequently loses data to disks, Spark is a memory-based distributed computing engine. It has a powerful built-in memory management mechanism to ensure that data is processed in memory first and supports disk storage.

This article focuses on how Spark’s memory management is implemented as follows:

  • Spark Memory Overview
  • Spark memory management mechanism
  • Memory allocation in Spark on Yarn mode

1 Spark Memory Overview

This section describes the basic process of Spark.

  • The user inDriverSubmit tasks, initialize the running environment (SparkContext, etc.)
  • The Driver uses the configuration directionResoureManagerApplication resources (Executors and Memory Resources)
  • ResoureManager Select the appropriate resource managerworkerThe node creates an executor process
  • ExecutorRegister with Driver and wait for it to be allocatedtasktask
  • The Driver end completeSparkContextClass, create a DAG, and assign a Taskset to an Executor.
  • Executor starts the thread to execute the task and returns the result.

During the Spark task running, the Driver and Executor processes are started. In addition to executing tasks submitted by Spark, the Driver process applies for Executor resources, registers executors, submits tasks, and coordinates and schedules tasks. The Executor process is responsible for executing specific tasks on the working node and communicating with the Driver to return results.

Spark performs data calculation mainly in the Executor process. The Executor performs persistent storage of RDD and Shuffle operation in a unified manner in the Spark memory management mechanism. The tasks running in Spark also share the Executor memory. Therefore, this article focuses on Executor memory management.

Spark Memory is divided into on-heap Memory and off-heap Memory. While in-heap memory is based on the JVM memory model, out-of-heap memory is created by calling the underlying JDK Unsafe API. The two memory types are implemented by Spark memory management module interfaces.

def acquireStorageMemory(...).: Boolean  // Apply for memory
def acquireExecutionMemory(...).: Long   // Request memory for execution
def releaseStorageMemory(...).: Unit     // Free execution memory
def releaseStorageMemory(...).: Unit     // Release storage memory
Copy the code

1.1 Spark heap memory

Executo, as a JVM process, has an internal JVM based memory management model.

Spark encapsulates a unified memory management interface, MemoryManager, to apply for and release object instance memory space by logically planning the JVM heap space. Maximize the memory space as long as the Spark operating mechanism is met.

Here involves the JVM heap space concept, is simply described in the program, the object instance | create, use and release of an array of memory, will be in a block in the JVM is called as “the JVM heap” manage assigned memory area.

After Spark creates an object, the JVM allocates a certain amount of memory space in the heap, creates a Class object, and returns the object reference. Spark saves the object reference and records the memory usage.

The memory parameters in Spark can be -executor-memory or -spark-executor-memory. Generally, the cores are defined in parameters during task submission and are submitted to ResourceManager for applying for executor resources together with -executor-cores.

A certain number of executors are created on the Worker node, and each Executor is allocated the size of -executor-memory in the heap. The Executor heap memory is shared by all Task threads, and data is exchanged in memory by multiple threads.

Spark heap memory is divided into Storage, Execution, and Other.

  • Storage Memory used to cache RDD data and broadcast broadcast variables
  • Execution only provides memory usage for the shuffle process
  • Other Provides memory space for Spark internal objects and user-defined objects

Spark supports multiple memory management modes. In different management modes, the proportion of memory areas in the heap varies. For details, see Section 2.

1.2 Spark Out-of-heap Memory

Spark1.6 introduces out-of-heap memory in addition to in-heap memory, further optimizing Spark memory usage.

In fact, if you have Java programming experience, I believe that the use of out-of-heap memory is familiar. The underlying method calls the C-based JDK Unsafe class method, which uses Pointers to directly perform memory operations, such as applying for, using, deleting, and releasing memory Spaces.

After 2.x, Spark abandoned the previous version of Tachyon and used the JDK Unsafe API common in Java to manage out-of-heap memory. This mode does not apply for memory in the JVM, but directly operates the system memory, reducing the overhead of memory space switching in the JVM, reducing the consumption of GC reclaim occupation, and achieving accurate memory control.

Outside the heap memory by default is not open, need will spark in the configuration. The memory. OffHeap. Enabled is set to True, at the same time configuration spark. Memory. OffHeap. The size parameter is set the heap size.

The out-of-heap partition consists of Execution memory and Storage memory, which are shared by all task threads.

2 Spark memory management mechanism

As mentioned above, Spark in different modes has different proportions of in-heap and out-of-heap memory areas.

Before Spark1.6, Spark uses the Static Memory Manager mode. The allocation ratio of Execution Memory and Storage Memory is Static and the default parameter is preset by the system.

After Spark1.6, Spark’s Memory management mode was changed to Unified Memory Manager to allow dynamic Storage and Execution Memory usage. As for the static management way is still retained, by spark. Memory. Enable useLegacyMode parameters.

2.1 Static Memory Manager

In the original memory management mode of Spark, the system allocates Storage and Execution memory space based on fixed memory configuration parameters by default. The configuration can be customized.

1. Allocate memory in the heap

The heap memory space is divided into Storage, Execution memory, and Other memory. The default partition is 6:2:2. The Storage memory area parameters: spark. Storage. MemoryFraction (0.6 by default) and Execution memory area parameters: spark. Shuffle. MemoryFraction (0.2 by default). The Other memory area is used to store user-defined data structures and Spark internal metadata, accounting for 20% of the system memory.

In the Storage memory area, the size of 10% is used as a Reserved room, to prevent memory leaks, by parameters: spark. Shuffle. SafetyFraction control (default 0.1). 90% of the Storage space is used as available memory for the Executor to cache RDD data and broadcast data. The Reserved parameters are the same as those for the Executor. There is also a portion of the Unroll area, which stores Unroll data and occupies 20% of the available Storage space.

The Unroll process: Before the RDD is cached into memory, the record object instance in the partition is stored in a discontinuous space in the other memory area of the heap. In the caching process of RDD, partitions in non-contiguous Storage space are converted to contiguous Storage space blocks and stored in the Storage memory area. This process is called Unroll.

Execution memory area, the size of 20% is used as a Reserved room, prevent OOM and other memory, by the parameter: the spark. Shuffle. SafetyFraction control (default 0.2). 80% of the space as Execution of available memory, intermediate data cache shuffle process, parameters: spark. Shuffle. SafetyFraction (default 0.8).

A formula to calculate

Available storage memory = systemMaxMemory * spark in storage. MemoryFraction * spark. Storage. SafetyFraction execution of available memory = systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safetyFractionCopy the code

2. Off-heap memory

Compared to in-heap memory, out-of-heap memory allocation is simpler. Outside the heap memory defaults to 384 m, spark by the system parameters. The yarn. The executor. MemoryOverhead setting. Overall memory Storage and Execution divides into two parts, this part of the distribution and heap memory, by parameters: spark. Memory. StorageFaction decision. Off-heap memory generally stores serialized binary data (byte stream). It is a contiguous memory area in the storage space, and its size can be calculated accurately. Therefore, there is no need to set reserved space.

3. Summary

  • The implementation mechanism is simple and easy to understand
  • The problem of memory imbalance is easy to occur, that is, the Storage, Execution side of the memory, the content is not enough
  • Developers are required to fully understand the storage mechanism, which is inconvenient to tune

For more details, please add my personal wechat account: Youlong525

2.2 Unified Memory Manager

To solve problems such as Memory imbalance caused by Static Memory Manager, Spark uses a new Memory management mode called Unified Memory Manager after 1.6. In the new mode, the old Executor static allocation of memory is removed, the dynamic allocation of memory is enabled, and Storage and Execution are divided into a unified shared memory region.

1. Heap memory

The total heap Memory is divided into Usable Memory and Reversed Memory. The reserved memory is used for exceptions such as OOM, and is allocated 300 MB by default. Available Memory can be divided into Unified Memory and Other Memory. The default ratio is 6:4.

Storage and Execution in unified memory, as well as Other memory, have the same parameters and scope of use as static memory. Dynamic memory usage is enabled between Storage and Execution.

Dynamic memory footprint mechanism

  1. Set the initial value of memory for Execution and Storage (default: 0.5).
  2. If the memory of one party is insufficient and the memory of the other party is free, the other party can occupy the memory of the other party
  3. If the memory of both parties is insufficient, the disk needs to be removed
  4. If the Execution memory is occupied, the Storage will transfer the Execution memory to the hard disk and return the Storage space
  5. Execution does not need to be returned when Storage memory is occupied

2. Off-heap memory

Consistent with the static management mode allocation, the default value of out-of-heap memory is 384M. The dynamic memory usage mechanism is enabled. The default initialization ratio is 0.5.

A formula to calculate

// Available storage & execution memory =
   (systemMaxMemory -ReservedMemory) 
    * spark.memoryFraction 
    * spark.storage.storageFraction
// (enable memory dynamic allocation mechanism, one side can occupy the other side when insufficient memory)
Copy the code

3. Summary

  • Dynamic memory ratio to improve reasonable memory utilization
  • Centrally manages Storage and Execution memory for easy tuning and maintenance
  • Storage memory may not be GC frequently enough due to unplanned Execution

3 Memory allocation in Spark On Yarn mode

Due to Spark’s robust memory management mechanism, Executors can efficiently process RDD memory operations and data flow in nodes. Yarn, the resource manager that allocates Executor memory, is also concerned about how to ensure the most reasonable allocation of memory during the process.

The basic process of Spark On Yarn is as follows:

    1. Spark DriverEnd submits the program and applies for Application to Yarn
    1. Yarn receives the request and creates the AppMaster on the NodeManager node
    1. AppMasterApplying for Resources from Yarn ResourceManager (Container)
    1. Select the appropriate node to createContainer(Executor process)
    1. Subsequent drivers start scheduling and run tasks

The Yarn Client and Yarn Cluster modes differ in some parts, but the basic process is similar. The memory configuration involved in the whole process is as follows (source default configuration):

var executorMemory = 1024                    
val MEMORY_OVERHEAD_FACTOR = 0.10   
val MEMORY_OVERHEAD_MIN = 384   

// Executo out-of-heap memory
val executorMemoryOverhead = 
    sparkConf.getInt("spark.yarn.executor .memoryOverhead",
    math.max((MEMORY_OVERHEAD_FACTOR
       * executorMemory).toInt
       , MEMORY_OVERHEAD_MIN))

// Executor allocates total memory
val executorMem= args.executorMemory
       + executorMemoryOverhead 
Copy the code

So suppose that when we submit a Spark application, if we set -executor-memory=5g.

spark-submit 
  --master yarn-cluster 
  --name test 
  --executor-memory 5g 
  --driver-memory 5g
Copy the code

According to the calculation formula in the source code:

memoryMem= args.executorMemory(5120) + executorMemoryOverhead(512) = 5632M

But actually looking at memory on Yarn UI is not this number, right? This is because Yarn enables resource normalization by default.

1. Normalize Yarn resources

Yarn determines the number of applied resources based on the minimum, maximum, and normalization factor to properly regulate application resources.

  • define

If the resource requested by the program is not an integer multiple of this factor, it will be modified to the value corresponding to the smallest integer multiple

Formula: CeiL (A/B)* B (A is application resource, b is regularization factor)Copy the code
  • The related configuration
Yarn.scheduler. Minimum-allocation-mb: specifies the minimum amount of memory that can be allocated. The default value is yarn.scheduler1024Yarn.scheduler. Minimum-allocation-vcores: specifies the minimum number of coresCPUNumber, default is1Yarn.scheduler. Maximum-allocation-mb: specifies the maximum amount of memory that can be allocated. The default value is yarn.scheduler8096Yarn.scheduler. Maximum-allocation-vcores: indicates the maximum number of cores that can be appliedCPUNumber, default is4
Copy the code

Returning to the previous memory calculation: Since memoryMem has computed a value of 5632, which is not a multiple of the regular factor (1024), it needs to be recalculated:

memoryMem = ceil(5632/1024)*1024=6144M

2. Memory allocation difference of the Driver in Yarn mode

Yarn Client and Cluster can be submitted. The memory allocation of Executor and Driver is different. ApplicationMaster in Yarn has a Container enabled to run.

In Client mode, a Container provides 1 GB memory and 1 CPU core by default. In Cluster mode, the driver-memory and driver-CPU configurations are specified. That is, the driver in Client mode uses the default memory. Dirvers in Cluster mode are customized.

  1. Cluster mode (driver-memory:5g): The driver memory available in CEIL (A/B)* B is 6144M
  2. Client mode (driver-memory:5g): CEIL (A/B)* B provides a driver memory of 5120 MB

3. Summary

As a distributed resource manager, Apache Yarn has its own memory management optimization mechanism. When Spark is deployed on Yarn, you need to consider the memory processing mechanism of both programs. This is the most neglected knowledge in production applications.

Write in the last

The Spark Memory management mechanism is the focus of Spark principles and tuning. This article starts with the Static Memory Manager and Unified Memory Manager modes. This section explains how the Spark calculation model implements memory management in simple terms. At the end of the section, the memory allocation of Spark On Yarn is described.

More good articles, please pay attention to my public number: big data Arsenal