Spark Kernel Overview

Spark kernel refers to the core operating mechanism of Spark, including the operating mechanism of Spark core components, Spark task scheduling mechanism, Spark memory management mechanism, and the operating principle of Spark core functions.

Review of Spark’s core components

Driver

The Spark driver node is used to execute the main method in the Spark task and execute the actual code. The Driver is responsible for the following tasks during Spark job execution:

1. Convert user programs into jobs;

2. Scheduling tasks between executors

3. Track Executor execution.

4. Display the query operation through THE UI.

Executor

The Spark Executor node is a JVM process that runs specific tasks independently of each other in a Spark job. When the Spark application is started, Executor nodes are started at the same time and exist throughout the Spark application life cycle. If an Executor node fails or crashes, The Spark application can continue to run. The tasks on the faulty Executor node are scheduled to run on other Executor nodes.

Executor has two core functions:

1. Run the tasks that comprise the Spark application and return the results to the Driver process.

2. They provide in-memory storage through their Block Managers for RDD’s in user programs that require caching. RDD is cached directly within the Executor process, so tasks can take advantage of cached data to speed up operations at run time.

Spark general running process Overview

The preceding figure shows the Spark common running process. No matter what mode Spark is deployed in, the Driver process is started after the task is submitted. The Driver process registers applications with the cluster manager, which assigns executors based on the task configuration file and starts them. When all the resources required by the Driver are met, the Driver starts to execute the main function and Spark query is lazy. When the action operator is executed, the Driver starts to calculate backwards. Stages are divided according to the wide dependency, and then each stage corresponds to a taskset. A taskset contains multiple tasks. According to the localization principle, the tasks are assigned to a specific Executor for execution. During the task execution, the Executor communicates with the Driver to report the task status.

2. Spark deployment mode

Spark supports three Cluster managers:

Standalone: Spark’s native simple cluster manager (Standalone) provides a complete set of Standalone services that can be deployed to a cluster without relying on any other resource management system.

Apache Mesos: a powerful distributed resource management framework that allows many different frameworks to be deployed on it, including YARN;

Hadoop YARN: a unified resource management mechanism on which multiple computing frameworks, such as Map Reduce and Storm, can be run. Drivers are classified into YARN client and YARN Cluster based on their location in a cluster.

In addition to these generic cluster managers, Spark also provides some simple cluster deployment patterns that users can easily test and learn from. Since most cluster managers used in real factory environments are Hadoop YARN, we focused on Spark cluster deployment in Hadoop YARN mode.

The operating mode of Spark depends on the value of the MASTER environment variable passed to SparkContext. Some modes also require auxiliary programming interfaces. Currently, the supported MASTER strings and urls include:

When a user submits a task to Spark, the following two parameters jointly determine the Spark operation mode.

– master MASTER_URL: determines the cluster to which the Spark task is submitted for processing.

– deploy-mode DEPLOY_MODE: determines the operating mode of the Driver. The value can be Client or Cluster.

Standalone mode running mechanism

The Standalone cluster has four important components, which are:

(1) Driver: it is a process on which the Spark application program is run and executed by the Driver process.

(2) Master: a process that schedules and allocates resources and monitors clusters.

(3) Worker: a process. A Worker runs on a server in a cluster and is mainly responsible for two responsibilities. One is to store some or some PARTITIONS of THE RDD with its own memory. The other is to start other processes and threads (executors) to process and evaluate partitions on the RDD in parallel.

(4) Executor: It is a process that can run multiple executors on a Worker. Executors start multiple threads (tasks) to perform parallel computation on partitions of RDD, that is, to perform operators such as MAP, flatMap and Reduce defined on RDD.

Standalone Client mode

1. In Standalone Client mode, the Driver runs on the local machine where the task is submitted.

2. The Driver registers the application with the Master. The Master finds internal resources that can start at least one Executor Worker according to the resource requirements of the Submit script, and then allocates executors among these workers.

3. Executor on Worker will reverse register with Driver after startup.

4. After all Executor registrations are complete, the Driver starts executing the main function.

5. After executing the Action operator, stage is divided;

6. Tasksets are generated for each stage and then distributed to each Executor for execution.

Standalone Cluster mode

1. In Standalone Cluster mode, after the task is submitted, the Master will find a Worker to start the Driver process.

2. After the Driver starts, it registers the application program with the Master.

3. According to the resource requirements of the Submit script, the Master finds all the internal resources that can start at least one Executor Worker, and then allocates executors among these workers.

Executor on Worker will reverse register with Driver after startup.

5. After all Executor registrations are complete, the Driver starts executing the main function.

6. When the Action operator is executed, stages are divided, and tasksets are generated for each stage.

7. Then distribute tasks to executors for execution.

Note that in Standalone mode (client/Cluster), the Master, upon receiving the Driver’s request to register the Spark application, captures the remaining resources it manages to start all the workers of an Executor. Executors are then distributed between these workers, and distribution only takes into account whether the resources on the Worker are sufficient until all executors required by the current application are allocated and the Executor is reverse-registered, and the Driver starts executing main.

Operation mechanism of YARN mode

YARN Client mode

1. In YARN Client mode, the Driver runs on the local machine where the task is submitted.

2. After the Driver starts, it communicates with ResourceManager to apply for starting ApplicationMaster.

3. ResourceManager then allocates the Container and starts ApplicationMaster on the appropriate NodeManager, which functions as an ExecutorLaucher. Apply for Executor memory from ResourceManager only.

4. ResourceManager allocates a Container to ApplicationMaster after receiving the ApplicationMaster request. ApplicationMaster starts the Executor process on the Specified NodeManager.

5. The Executor process reversely registers with the Driver after starting.

6. After all Executor registrations are complete, the Driver starts executing main.

7. When the Action operator is executed, a job is triggered and stages are divided according to the wide dependencies.

Tasksets are generated for each stage and then distributed to each Executor for execution.

YARN Cluster mode

1. In YARN Cluster mode, after a task is submitted, the system communicates with ResourceManager to apply for starting ApplicationMaster.

2. ResourceManager allocates the Container and starts ApplicationMaster on the appropriate NodeManager. (The ApplicationMaster is the Driver)

3. After the Driver starts, it applies for Executor memory from ResourceManager. ResourceManager allocates containers after receiving ApplicationMaster’s request. Then start the Executor process on the appropriate NodeManager;

4. The Executor process reversely registers with the Driver after starting.

5. After all Executor registrations are complete, the Driver starts executing the main function.

6. When the Action operator is executed, a job is triggered and stages are divided according to the wide dependencies.

7. Tasksets are generated for each stage and then distributed to each Executor for execution.

Spark communication architecture

Spark Communication architecture Overview

Spark2.x uses the Netty communication architecture as the internal communication component. Spark’s new NETty-based RPC framework borrows from Akka’s design and is based on the Actor model, as shown below:

Each component (Client, Master, and Worker) in the Spark communication framework can be regarded as an independent entity. The entities communicate with each other through messages. The relationship between each component is as follows:

The Endpoint (Client/Master/Worker) has an InBox and N outboxes (N>=1, N depends on how many other endpoints the current Endpoint communicates with, and one OutBox for each Endpoint that communicates with it). Messages received by the Endpoint are written to the InBox, and outgoing messages are written to the OutBox and sent to the InBox of other endpoints.

Spark communication architecture analysis

The Spark communication architecture is shown as follows:

  1. RpcEndpoint: Each node (Client, Master, and Worker) is called an RPC endpoint. Each node implements the RpcEndpoint interface and internally designs different messages and service processing based on the requirements of different endpoints. Call Dispatcher if you need to send (ask);

  2. RpcEnv: RPC context, the context on which each RPC endpoint runs is called RpcEnv;

  3. Dispatcher: message Dispatcher to the corresponding instruction inbox/outbox for messages that an RPC endpoint needs to send or receive from a remote RPC. If the instruction receiver is himself, it is put into the inbox, if the instruction receiver is not himself, it is put into the outbox;

  4. The Inbox. Instruction message Inbox, a local RpcEndpoint corresponds to an Inbox, and the Dispatcher adds the corresponding EndpointData to the internal ReceiverQueue every time it stores a message to the Inbox. In addition, when Dispatcher is created, a separate thread will be started to poll ReceiverQueue for inbox message consumption.

  5. RpcEndpointRef: RpcEndpointRef is a reference to the remote RpcEndpoint. When we need to send a message to a specific RpcEndpoint, we usually need to get a reference to that RpcEndpoint and send the message through the application.

  6. OutBox: instruction message OutBox. For the current RpcEndpoint, one target RpcEndpoint corresponds to one OutBox. If messages are sent to multiple target RPcendPoints, there are multiple Outboxes. Once the message is put into the Outbox, it is then sent out through the TransportClient. Messages are put into outboxes and sent in the same thread;

  7. RpcAddress: indicates the IP address of remote RpcEndpointRef, Host + Port.

  8. TransportClient: Netty communication clients. One OutBox corresponds to one TransportClient. The TransportClient constantly polls the OutBox and requests the remote TransportServer based on the receiver information of OutBox messages.

  9. TransportServer: Netty communication server. One RpcEndpoint corresponds to one TransportServer. After receiving remote messages, the Dispatcher Dispatcher dispatches the messages to the corresponding sending and receiving boxes.

Based on the above analysis, a high-level view of the Spark communication architecture is shown below:

SparkContext parsing

In Spark, SparkContext communicates with clusters, applies for resources, and allocates and monitors tasks. When the Executor in the Worker node finishes running the Task, the Driver is also responsible for closing the SparkContext.

SparkContext can also be used to represent drivers.

SparkContext is the user’s only entry point to the Spark cluster and can be used to create RDD, accumulator, and broadcast variables in the Spark cluster.

SparkContext is also an important object in the Spark Application. It is the core of the entire Application scheduling (excluding resource scheduling).

The core function of SparkContext is to initialize the core components required by the Spark application, including DAGScheduler, TaskScheduler, and SchedulerBackend. It is also responsible for registering Spark programs with the Cluster Manager.

In the actual coding process, we will first create SparkConf instance and customize SparkConf properties. Then, we pass SparkConf as the only construction parameter of SparkContext class to complete the creation of SparkContext instance object. SparkContext initializes DAGScheduler, TaskScheduler, and SchedulerBackend during the instantiation process. When the ACTION operator of RDD triggers a Job, SparkContext calls DAGScheduler to divide jobs into several stages based on the width of dependencies. TaskScheduler schedules tasks for each Stage. SchedulerBackend applies for and manages computing resources (Executor) that are allocated to current applications by the cluster.

If we compare the Spark Application to a car, SparkContext is the engine of the car, and SparkConf is the configuration parameters of the engine.

The following figure shows the interaction between ApplicationMaster, Driver, and Executor internal modules during task scheduling in Spark-on-YARN mode:

When the Driver initializes SparkContext, it initializes DAGScheduler, TaskScheduler, SchedulerBackend, and HeartbeatReceiver. SchedulerBackend and HeartbeatReceiver are started. SchedulerBackend applies for resources through ApplicationMaster and continually gets appropriate tasks from TaskScheduler to Executor for execution. The HeartbeatReceiver receives the Executor’s heartbeat, monitors the Executor’s health, and notifies the TaskScheduler.

5. Spark task scheduling mechanism

In a factory environment, the Spark Cluster is usually deployed in yarn-cluster mode. In subsequent kernel analysis, the default Cluster deployment mode is yarn-cluster mode.

Spark task submission process

Spark yarn-cluster Task submission process

The sequence diagram below clearly illustrates the complete flow of a Spark application from commit to run:

1. Submit a Spark Application. Start an Application to ResourceManager using the Client and check whether sufficient resources meet the Application requirements. Prepare the startup context of ApplicationMaster, hand it to ResourceManager, and monitor the Application status periodically.

2. When the submitted resource queue contains resources, ResourceManager starts ApplicationMaster on a NodeManager. ApplicationMaster starts the Driver background thread separately. ApplicationMaster connects to the Driver through the local RPC, applies for Container resources from ResourceManager, and runs the Executor process (one Executor corresponds to one Container). When ResourceManager returns a Container resource, ApplicationMaster starts Executor on the corresponding Container.

3. The Driver thread initializes the SparkContext object, prepares the context needed to run, and then, on the one hand, maintains the RPC connection with ApplicationMaster, applies for resources through ApplicationMaster, on the other hand, starts scheduling tasks according to the user business logic. Deliver the task to an existing free Executor.

4. When ResourceManager returns the Container resource to ApplicationMaster, ApplicationMaster tries to start the Executor process on the corresponding Container. Reverse registration is performed for the Driver. After the registration is successful, the heartbeat communication between the Driver and the Driver is maintained and the Driver waits for the task to be distributed. After the distributed task is complete, the task status is reported to the Driver.

As you can see from the sequence diagram above, the Client is only responsible for submitting the Application and monitoring its status. Task scheduling for Spark focuses on two aspects: resource application and task distribution. These tasks are implemented between ApplicationMaster, Driver, and Executor.

Overview of Spark task scheduling

When the Driver is up, it prepares tasks based on the user program logic and distributes them gradually based on Executor resources. Before explaining task scheduling in detail, I first explain some concepts in Spark. A Spark application consists of three concepts: Job, Stage, and Task:

A Job is bounded by an Action method. When an Action method is encountered, a Job is triggered.

Stage is a subset of jobs, bounded by RDD wide dependencies (Shuffle), and is divided once Shuffle is encountered.

Tasks are a subset of stages, measured by parallelism (number of partitions), and how many tasks there are.

Spark Task scheduling is performed in two modes: Stage level scheduling and Task level scheduling. The overall scheduling process is as follows:

The Spark RDD uses its Transactions operation to form the RDD blood diagram, that is, DAG. Finally, the Spark RDD invokes actions to trigger jobs and schedule execution. DAGScheduler is responsible for stage-level scheduling, which mainly divides jobs into several stages and packages each Stage into tasksets for the TaskScheduler to schedule. TaskScheduler is responsible for task-level scheduling. The TaskSet sent by DAGScheduler is sent to Executor based on the specified scheduling policy. During the scheduling process, SchedulerBackend provides available resources. SchedulerBackend has multiple implementations that connect to different resource management systems.

Spark stage-level scheduling

Spark’s task scheduling starts with DAG cutting and is mainly done by DAGScheduler. When an Action is encountered, it will trigger the calculation of a Job and hand it to DAGScheduler for submission. The following is the flow chart of method invocation related to Job submission.

The Job is packaged by the final RDD and Action methods. SparkContext submits the Job to DAGScheduler, which splits a Job into several Stages based on the DAG formed by the blood relationship of RDD. The specific division strategy is: The final RDD continuously determines whether the parent dependency is a wide dependency through dependency backtracking, that is, stages are divided with Shuffle as the boundary. RDD with narrow dependencies are divided into the same Stage, and pipelin-type calculation can be performed, as shown in the purple process in the figure above. There are two types of Stages. One is called a ResultStage, which is the most downstream Stage of a DAG and is determined by the Action method. The other is called a ShuffleMapStage, which prepares data for the downstream Stages.

A Job is triggered by saveAsTextFile, which consists of rdD-3 and saveAsTextFile methods. According to the dependency relationship between RDD, the Job is searched backtracking from RDD-3 to RDD-0. During the backtracking, RDD-3 depends on RDD-2 and has a wide dependency. Therefore, stages are divided between RDD-2 and RDD-3. Rdd-3 is divided into the last Stage, that is, in the ResultStage, RDD-2 depends on RDD-1 and RDD-1 depends on RDD-0. These dependencies are narrow dependencies. Therefore, RDD-0, RDD-1 and RDD-2 are divided into the same Stage, namely ShuffleMapStage. During the actual execution, data records will perform the transformation from RDD-0 to RDD-2 in one go. It is not difficult to see that its essence is a depth-first search algorithm. If a Stage is submitted, we need to determine whether its parent Stage is executed. The current Stage can be submitted only after the parent Stage is executed. If a Stage does not have a parent Stage, the submission starts from this Stage. When Stage is submitted, Task information (Partition information and methods, etc.) is serialized and packaged into a TaskSet and sent to the TaskScheduler. Each Partition corresponds to a Task. On the other hand, the TaskScheduler monitors the running status of the Stage. Only when the Executor is lost or the Task fails due to Fetch, it needs to resubmit the failed Stage to schedule the failed Task. Other types of Task failures are retried during the scheduling process of the TaskScheduler. DAGScheduler does relatively simple things, just partitioning daGs at the Stage level, committing stages and monitoring related state information. TaskScheduler is more complex, which is explained in detail below.

Spark Task-level scheduling

Spark Task scheduling is done by the TaskScheduler. DAGScheduler packages stages into the TaskSet and gives them to the TaskScheduler. The TaskScheduler encapsulates the TaskSet as TaskSetManager and adds it to the scheduling queue. The TaskSetManager structure is shown below.

The TaskSetManager is responsible for monitoring and managing Tasks in the same Stage. The TaskScheduler uses the TaskSetManager as a unit to schedule Tasks.

After initialization, Scheduler starts SchedulerBackend, which deals with the outside world, receives Executor registration information, and maintains Executor status. Therefore, SchedulerBackend manages “food”. It will also periodically “ask” TaskScheduler if it has any tasks to run after it starts. That is, it will periodically “ask” TaskScheduler “I have this amount of spare time, do you want it?” When TaskScheduler “asks” it in Backend. TaskSetManager will be selected from the scheduling queue according to the specified scheduling policy to schedule and run. The general method call flow is as follows:

After adding TaskSetManager to the rootPool scheduling pool, riviveOffers method of SchedulerBackend is called to send a ReviveOffer message to driverEndpoint. When driverEndpoint receives the ReviveOffer message, it calls the makeOffers method to filter out the active executors that were reverse-registered with the Driver at task startup. Then encapsulate the Executor as a WorkerOffer object; Once the workerOffers are ready, the taskScheduler calls the resourceOffer to assign tasks to executors based on those resources.

Six, scheduling strategy

TaskScheduler wraps a TaskSet from DAGScheduler into a TaskSetManager and throws it into a task queue. They are then fetched from the task queue according to certain rules and run on executors given by SchedulerBackend. The scheduling process is actually coarse-grained for TaskSetManager. The scheduling queue hierarchy is shown in the following figure:

The TaskScheduler manages the task queue in a tree. The node type in the tree is Schdulable, the leaf node is TaskSetManager, and the non-leaf node is Pool. The following figure shows the inheritance relationship between them.

TaskScheduler supports two scheduling strategies, FIFO, which is the default, and FAIR. During TaskScheduler initialization, the TaskScheduler instantiates rootPool, which represents the root node of the tree and is of Pool type.

1. FIFO scheduling strategy

FIFO scheduling policy execution steps are as follows:

1) Priority of s1 and S2 (an attribute of the Schedulable class, denoted as priority, the smaller the value, the higher the priority);

2) If the priorities of two schedulables are the same, the identities of the stages s1 and S2 belong to are identified and compared (an attribute of the Schedulable class is denoted as priority, and the smaller the value is, the higher the priority is).

3) If the comparison result is less than 0, s1 is preferentially scheduled, otherwise S2 is preferentially scheduled.

2. FAIR scheduling policy

The tree structure of FAIR scheduling policy is as follows:

The FAIR mode has one rootPool and multiple sub-pools, each of which stores all taskSetMagagers to be allocated.

You can specify a scheduling pool as the parent scheduling pool of the TaskSetManager by specifying the spark.scheduler.pool property in Properties. If the root scheduling pool does not have the corresponding scheduling pool, A scheduling pool with this property value is created as the parent of the TaskSetManager and as a child of the root scheduling pool.

In FAIR mode, you need to sort the subpools first, and then the TaskSetMagager in the subpools, because both pools and TaskSetMagager inherit Schedulable properties and use the same sort algorithm.

The sorting process is compared based on fairshare. Each object to be sorted contains three attributes :runningTasks (number of tasks running), minShare, and weight. The runningTasks value is taken into account in the comparison. MinShare and weight.

Note that the values of minShare and weight are specified in the fairScheduler.xml file, which is read by the scheduling pool during construction.

1) If A’s runningTasks are larger than its minShare and B’s runningTasks are smaller than its minShare, then B is ahead of A. (runningTasks are executed before minShare.)

2) If the runningTasks of both objects A and B are smaller than their minShare, then compare the ratio of runningTasks to minShare (minShare usage). (Execute first if minShare usage is low)

3) If the runningTasks of both A and B objects are greater than their minShare, then compare the ratio of runningTasks to weight. (Perform first if the weight usage is low)

4) Compare names if all of the above comparisons are equal.

On the whole, the comparison process can be controlled by the two parameters minShare and weight, so that the minShare usage and weight usage (fewer tasks actually run) can be run first.

After sorting in FAIR mode, all tasksetManagers are put into an ArrayBuffer, which is then retrieved and sent to Executor for execution.

Since the TaskSetManager encapsulates all tasks of a Stage and is responsible for managing and scheduling these tasks, The next Task is for the TaskSetManager to fetch tasks one by one according to certain rules to the TaskScheduler, which then sends tasks to SchedulerBackend for execution by the Executor.

Local scheduling

DAGScheduler cuts jobs, divides stages, and submits tasks corresponding to a Stage by calling submitStage, which calls submitMissingTasks. SubmitMissingTasks determine the preferredLocations of each task that needs to be evaluated, and call getPreferrdeLocations() to get the priority location of each partition. The priority of the partition is the priority of the task. For each task submitted to the TaskSet, the priority of the task is the same as the priority of the partition corresponding to the task. Once the TaskSetManager is retrieved from the scheduler queue, the next task is for the TaskSetManager to fetch tasks one by one to the TaskScheduler according to certain rules. The TaskScheduler is sent to SchedulerBackend for execution on Executor. As mentioned earlier, TaskSetManager encapsulates all tasks for a Stage and is responsible for managing and scheduling them. Based on the priority of each task, determine the Locality level of the task, of which there are five types of Locality in descending order of priority:

When the Spark scheduling task is executed, the Spark scheduling task always starts at the highest level of localeness. If a task is started at the level of localeness X and all nodes corresponding to this level have no free resources, the task fails to start. Instead of immediately lowering the localness level, the task is started again at X localness level within a certain amount of time. If the time limit is exceeded, the task is downgraded to try the next localness level, and so on. By increasing the maximum latency allowed for each category, the corresponding Executor may have the resources to execute the task during the wait phase, which in turn contributes to performance.

Failure retry and blacklist mechanism

In addition to selecting an appropriate Task to run, it also monitors the execution status of tasks. As mentioned earlier, SchedulerBackend deals with external schedulers. After a Task is submitted to Executor, Executor reports the execution status to SchedulerBackend. SchedulerBackend tells TaskScheduler that the Task scheduler finds the corresponding TaskSetManager and notifies the TaskSetManager. If the number of failed tasks does not exceed the maximum number of retries, the Task is put back into the Task pool. Otherwise, the entire Application fails. In the process of recording the number of tasks that failed last time, the ExecutorId and Host where the Task failed last time are recorded. In this way, the blacklisting mechanism is used to prevent the Task from being scheduled to the node that failed last time, providing some fault tolerance. The blacklist records the ExecutorId and Host from which the Task last failed, as well as the corresponding “block” time, during which the Task will not be scheduled on this node.

7. Resolve Spark Shuffle

ShuffleMapStage and FinalStage

When dividing the stages, the last stage becomes a FinalStage, which is essentially a ResultStage object, and all the preceding stages are called ShufflemapStages.

The ShuffleMapStage ends with the shuffle file written to disk.

The ResultStage basically corresponds to the action operator in the code, that is, a function is applied to the data set of each PARTITION in the RDD, indicating the completion of a job.

Number of tasks on Shuffle

Determine the number of tasks on the map end

Shuffle in the process of task number is determined by RDD partition Numbers, the partition number and parameters of RDD spark. The default. There is a close relationship between parallelism.

In Yarn Cluster mode, if there is no manually spark. Default. Parallelism, there are:

Others: total number of cores on all executor nodes or 2, Whichever is larger. The spark. Default. Parallelism = Max (all executor use total core, 2)

If the configuration is manual:

The spark. Default. Parallelism = configuration values

There is one more important configuration:

The maximum number of bytes to pack into a single partition when reading files. spark.files.maxPartitionBytes = 128 M (the default)

Represents the maximum number of bytes of data that can be stored in a partition of the RDD. If a 400MB file has only two partitions, an error will occur during action.

When a spark application execution, generate sparkContext, generates two parameters at the same time, the spark which are obtained by the above. The default. The parallelism of these two parameters are derived:

sc.defaultParallelism = spark.default.parallelism

sc.defaultMinPartitions = min(spark.default.parallelism,2)

When the above parameters are determined, the number of RDD partitions can be calculated.

(1) RDD generated by Scala collection parallelize

val rdd = sc.parallelize(1 to 10)

Parallelize if the number of partitions is not specified when operating on parallelize, then:

Number of RDD partitions = sc.DefaultParallelism

(2) RDD generated by textFile in the local file system

val rdd = sc.textFile(“path/file”)

RDD partitions = Max (number of local file partitions, sc.defaultminpartitions)

(3) RDD generated in the HDFS

Number of partitions in RDD = Max (number of blocks in HDFS files, sc.defaultminpartitions)

(4) Obtain data from HBase data tables and convert it to RDD

Number of RDD partitions = Number of Table regions

(5) DataFrame converted from JSON (or Parquet, etc.) files

Number of RDD partitions = Number of blocks stored in the file system

(6) Spark Streaming Obtains partition number corresponding to Kafka message

Based on the Receiver:

In the Receiver approach, partitions in Spark and Partitions in Kafka are not related, so if we increase the number of partitions per topic, we are simply adding threads to handle topics consumed by a single Receiver. But this does not increase Spark’s parallelism in processing data.

Based on the DirectDStream:

Spark creates as many RDD partitions as Kafka partitions and reads data from Kafka in parallel. Therefore, there is a one-to-one mapping between Kafka partitions and RDD partitions.

Determine the number of Tasks on the Reduce end

Data is aggregated on the Reduce end. Some aggregation operators can manually specify the parallelism of Reduce tasks. If the parallelism is not specified, the number of partitions in the last RDD on the Map end is used as the number of partitions.

Read data from the Reduce end

According to the division of stages, map task and Reduce task are not in the same stage. Map Task is located in ShuffleMapStage and Reduce task is located in ResultStage. Map task will be executed first. How does the later-executed Reduce task know where to pull the map task’s dropped data?

The data pulling process on the Reduce side is as follows:

1. After the map Task is finished, the calculation status and disk small file location will be encapsulated into mapStatue object. Then the MapOutPutTrackerWorker object in this process sends the MapStatus object to the MapOutPutTrackerMaster object in the Driver process.

2. Before the Reduce task starts to execute, the MapOutPutTrackerWorker in this process sends a request to the MapOutPutTrackerMaster in the Driver process to request the location of small files on disks.

3. After all Map tasks are completed, the MapOutPutTrackerMaster in the Driver process knows the location of all small files on the disk. The MapOutPutTrackerMaster tells the MapOutPutTrackerWorker the location of the disk small file.

BlockerTransforService pulls data from the Executor node. By default, five child threads are started. The amount of data to be pulled at a time cannot exceed 48 MB. (The Reduce Task obtains a maximum of 48 MB of data at a time and stores the pulled data to 20% of the Executor memory.)

HashShuffle parsing

The following discussion assumes that each Executor has one CPU core.

1. Unoptimized HashShuffleManager

In the Shuffle write stage, data processed by each task is “divided” by key so that shuffle operators (such as reduceByKey) can be implemented in the next stage after the calculation of one stage. By “partition”, the same key is written to the same disk file by the hash algorithm, and each disk file belongs to only one task of the downstream stage. Before writing data to disk, data is written to the memory buffer. When the memory buffer is full, data is overwritten to the disk file.

How many tasks will be created for the next stage? How many disk files will be created for each task of the current stage. For example, if the next stage has a total of 100 tasks, each task of the current stage will create 100 disk files. If the current stage has 50 tasks, a total of 10 executors, and each Executor executes 5 tasks, then a total of 500 disk files will be created on each Executor, and 5000 disk files will be created on all executors. Thus, the number of disk files generated by an unoptimized Shuffle Write operation is staggering.

The shuffle read phase is usually what you do at the beginning of a stage. At this point, each task of this stage needs to pull all the same keys in the calculation results of the previous stage from each node to the node where it is located through the network, and then perform operations such as key set or link. During shuffle Write, each Reduce task in each downstream stage of map task creates a disk file. Therefore, during shuffle Read, Each Reduce task only needs to pull its own disk file from the node where all map tasks in the upstream stage reside.

Shuffle Read’s pull process is aggregated as it is pulled. Each Shuffle Read task has its own buffer, and each shuffle read task can only pull data of the same size as the buffer, and then aggregate it through a Map in your village. After aggregating a batch of data, pull down the next batch of data and put it into the buffer buffer for aggregation. And so on, until finally all the data to pull out, and get the final result.

The working principle of HashShuffleManager without optimization is shown below:

2. Optimized HashShuffleManager

In order to optimize HashShuffleManager we can set a parameter, the spark. Shuffle. ConsolidateFiles, this parameter defaults to false, it is set to true can open the optimization mechanism, generally speaking, If we use HashShuffleManager, it is recommended that this option be turned on.

After the consolidate mechanism is enabled, tasks do not create a disk file for each task in the downstream stage during shuffle write. In this case, the concept of shuffleFileGroup appears. Each shuffleFileGroup corresponds to a batch of disk files, and the number of disk files is the same as the number of tasks in the downstream stages. An Executor can execute as many tasks in parallel as it has CPU cores. Each of the first parallel tasks runs a shuffleFileGroup and writes data to the corresponding disk file.

When the Executor CPU core completes a batch of tasks and then executes the next batch of tasks, the next batch of tasks will reuse the existing shuffleFileGroup, including the disk files in it. That is, the task will write data to the existing disk files. It does not write to a new disk file. Therefore, consolidate allows different tasks to reuse the same batch of disk files. In this way, disk files of multiple tasks are consolidated to a certain extent, greatly reducing the number of disk files and improving shuffle Write performance.

Assuming the second stage has 100 tasks and the first stage has 50 tasks, there are still 10 executors (with 1 Executor CPU), each executing 5 tasks. The original unoptimized HashSHuffleManager would have generated 500 disk files per Executor and 5,000 disk files for all executors. However, after optimization, the number of disk files created per Executor is calculated as follows: The number of CPU cores * the number of tasks at the next stage, i.e., each Executor will create 100 disk files and all executors will create 1000 disk files.

The working principle of the optimized HashShuffleManager is shown in the figure below:

SortShuffle parsing

SortShuffleManager can be divided into two running mechanisms, one is ordinary running mechanism, the other is bypass running mechanism. When shuffle read task less than or equal to the number of spark. Shuffle. Sort. BypassMergeThreshold when the value of the parameter (the default is 200), will enable the bypass mechanism.

1. General operation mechanism

In this mode, data will be first written into a memory data structure. At this time, different data structures may be selected according to different Shuffle operators. If the reduceByKey shuffle operator is used as an aggregation type, Map data structure will be selected and the data will be written into the memory while the aggregation is carried out through Map. If join is a common shuffle operator, Array data structures are directly written into the memory. Then, after each piece of data is written into an in-memory data structure, it determines whether a critical threshold has been reached. If a critical threshold is reached, an attempt is made to overwrite the in-memory data structure to disk and then flush the in-memory data structure.

Before overwriting to disk files, the existing data in the memory data structure is sorted by key. After sorting, data is written to disk files in batches. The default batch number is 10000. That is, 10000 pieces of sorted data are written to disk files in batches. Writing to disk files is done through Java’s BufferedOutputStream. BufferedOutputStream is a Java BufferedOutputStream. The BufferedOutputStream first buffers data in the memory. When the memory buffer is full, data is written to the disk file again, which reduces disk I/o times and improves performance.

Multiple disk overwrites occur when a task writes all data to an in-memory data structure, resulting in multiple temporary files. Finally, all temporary disk files are merged. This process is called merge. In this process, data from all temporary disk files is read and written to the final disk file. In addition, since a task corresponds to only one disk file, which means that the data prepared by the task for the downstream task is stored in this file, you will also write a separate index file at a time, which identifies the start offset and end offset of each downstream task in the file.

SortShuffleManager greatly reduces the number of files because it has a disk file merge process. For example, the first stage has 50 tasks and a total of 10 executors, each executing 5 tasks, while the second stage has 100 tasks. Because each task ends up with only one disk file, there are only five disk files per Executor and 50 disk files for all executors at this point.

The working principle of SortShuffleManager with common operation mechanism is shown as follows:

2. Bypass operation mechanism

Triggering conditions of the bypass mechanism are as follows:

(1) shuffle map task number is less than the spark. Shuffle. Sort. BypassMergeThreshold parameter values.

(2) Shuffle operator that is not an aggregation class.

In this case, each task creates a temporary disk file for each downstream task, hash the data based on the key, and write the key into the disk file based on the hash value of the key. Of course, a disk file is written to the memory buffer first, and then overwrites to the disk file when the buffer is full. Finally, all temporary disk files are also merged into a single disk file and a single index file is created.

The disk write mechanism of this process is exactly the same as that of the unoptimized HashShuffleManager, in that a staggering number of disk files are created, only a disk file merge is done at the end. So the small number of final disk files also makes shuffleread performance better than the unoptimized HashShuffleManager.

The difference between this mechanism and ordinary SortShuffleManager operation mechanism lies in: first, the disk write mechanism is different; Second, it doesn’t sort. In other words, the biggest advantage of this mechanism is that data sorting is not required during Shuffle Write, thus reducing the performance overhead.

The working principle of SortShuffleManager with common operation mechanism is shown as follows:

Spark Memory management

When the Spark application is executed, the Spark cluster starts the Driver and Executor JVM processes. The Driver is the main control process that creates the Spark context, submits the Spark Job, and converts the Job into a computing Task. Task scheduling is coordinated between Executor processes that perform specific computation tasks on the working node and return the results to the Driver, as well as provide storage for persistent RDD.

In-heap and off-heap memory planning

As a JVM process, Executor’s memory management is built On top of the JVM’s memory management, and Spark allocates the JVM’s on-heap space in more detail to make full use of memory. Spark also introduces off-heap memory, which can create space directly in the system memory of the working node, further optimizing memory usage.

In-heap memory is managed by THE JVM, and out-of-heap memory is directly applied and released from the operating system.

1. In-heap memory

The size of heap memory is set by the Spark application startup parameter -executor -memory or spark.executor. Memory. Concurrent tasks running within Executor share memory in the JVM heap. Memory used by these tasks to cache RDD and Broadcast data is planned as Storage memory, and memory used by these tasks to perform Shuffle is planned as Execution memory. The remaining space is not planned. Object instances inside Spark or in user-defined Spark applications occupy the remaining space. The space occupied by the three components varies according to the management mode.

Spark’s management of heap memory is logically planned because the memory used by object instances is allocated and released by the JVM, and Spark can only record the memory after the application and before the release. The specific process is as follows:

1. Spark creates an object instance in the code.

2. The JVM allocates space from memory in the heap, creates objects and returns object references;

3. Spark saves a reference to the object and records the memory occupied by the object.

The process for releasing memory is as follows:

1. Spark records the memory released by the object and deletes its reference.

Wait for the JVM’s garbage collection mechanism to release the heap memory occupied by the object.

We know that the JVM object can be stored in the form of serialization, serialization is the process of the object is converted to binary byte streams, in essence can be understood as to transform the chain store for the discontinuous space for continuous or block storage space, a serialization is required to access the inverse process – the deserialization, to circulate the bytes object, Serialization can save storage space, but increase the computation overhead of storage and reading.

For serialized objects in the Spark, because it is in the form of a byte stream, the memory size can be calculated directly, and for the serialized objects, its memory is by periodically sampled approximate estimate, that not every time new data item can calculate a memory size, this method reduces the time cost but may error is bigger, As a result, the actual memory at one point may be much larger than expected. In addition, it is very possible that object instances marked as freed by Spark are not actually reclaimed by the JVM, resulting in less memory actually available than the memory recorded by Spark. Therefore, Spark does not accurately record the actual heap Memory available, so it cannot completely avoid OOM (Out of Memory) exceptions.

Although not precisely controlled in pile memory application and release, but the Spark of storage memory and execute the planning and management of the memory independently, and can decide whether to new RDD storage memory buffers and whether execution of memory for the new task allocation, to a certain extent can improve memory utilization, reduce the occurrence of abnormal.

2. Off-heap memory

To optimize memory usage and improve Shuffle sorting efficiency, Spark introduces off-heap memory, which can create space in the system memory of the working node to store serialized binary data.

Out-of-heap memory means that memory objects are allocated to memory outside the Java virtual machine’s heap, which is managed directly by the operating system (rather than the virtual machine). The result is to keep the heap small to reduce the impact of garbage collection on the application.

Using the JDK Unsafe API (which, starting with Spark2.0, is not based on Tachyon when managing out-of-heap storage, but rather on the same implementation as out-of-heap execution), Spark can directly operate out-of-heap memory, reducing unnecessary memory overhead. And frequent GC scans and collections, which improve processing performance. Out-of-heap memory can be allocated and freed precisely because it is not allocated through the JVM, but directly to the operating system. The JVM does not specify a precise point in time for memory cleanup, so it cannot be freed precisely. Moreover, the space occupied by serialized data can be calculated precisely, so it is less difficult to manage and less error than the heap memory.

Outside the default heap memory is not enabled, can be configured spark. Memory. OffHeap. Enable enabled parameter, and by the spark. Memory. OffHeap. The size on the size of a pile of outer space. With the exception of no Other space, out-of-heap memory is divided the same way as in-heap memory, with all running concurrent tasks sharing storage and execution memory.

(This part of Memory is mainly used for shared libraries of the program, such as Perm Space, thread Stack, and some Memory mapping, etc., or C-like allocate Object.)

Memory allocation

1. Static memory management

Under the static memory management mechanism initially adopted by Spark, the storage memory, execution memory, and other memory sizes are fixed during the Spark application running, but can be configured before the application is started. The memory allocation in the heap is as follows:

As you can see, the size of the available heap memory needs to be calculated as shown in the code listing:

Available storage memory = systemMaxMemory * spark in storage. MemoryFraction * spark in storage, safety Fraction

Available to perform memory = systemMaxMemory * spark in the shuffle. MemoryFraction * spark in the shuffle. Safety Fraction

SystemMaxMemory depends on the size of memory in the current JVM heap, and the last available execution or storage memory is calculated by multiplying the respective memoryFraction parameters and safetyFraction parameters. The significance of the two safetyFraction parameters in the above calculation formula lies in reserving an insurance area of 1-SafetyFraction in logic to reduce the risk of OOM caused by actual memory exceeding the current preset range (as mentioned above, the estimation of memory sampling for unserialized objects will produce errors). It is important to note that the reserved insurance area is just a logical plan, and Spark is used in the same way as “other memory”.

Storage memory and Executor memory are both reserved in case of OOM. The memory size recorded in Spark heap is inaccurate and an insurance area needs to be reserved.

Off-heap space allocation is relatively simple, with only storage memory and execution memory. Execution of available memory and storage space of the memory footprint size directly by the parameters of the spark. Memory. StorageFraction decision, as a result of the memory footprint of the space outside the heap can be precise calculation, so no need to set the insurance area.

Static memory management mechanism is relatively simple to implement, but if the user is not familiar with the Spark 鵆 mechanism, scale and computing tasks or not according to the specific data or do the corresponding configuration, it is easy to cause the situation of “general water, general flame”, namely the storage memory and execute one remaining in the memory a lot of space, while the other is filled, early Old content has to be eliminated or removed to store new content. This approach is now rarely used by developers due to the new memory management mechanism, and Spark has retained its implementation for the purpose of compatibility with older versions of applications.

2. Unified memory management

The unified memory management mechanism introduced after Spark1.6 differs from static memory management in that storage memory and execution memory share the same space and can dynamically occupy each other’s free area. The in-heap memory structure of unified memory management is shown in the following figure:

The out-of-heap memory structure for unified memory management is shown below:

The most important optimization is the dynamic occupancy mechanism, whose rules are as follows:

1, set the basic storage memory, and perform memory area (spark. Storage. StorageFraction parameters), the set to determine the scope of both sides have the space;

2. If the space of both parties is insufficient, the storage device is saved to a disk. If your own space is insufficient and the other side is free, you can borrow the other side’s space; (Insufficient storage is not enough to hold a full Block)

3. After the execution memory space is occupied by the other party, the other party can transfer the occupied part to the disk, and then “return” the borrowed space;

4. After the memory space is occupied by the peer party, you cannot ask the peer party to return the memory space. This is complicated because many factors need to be considered during Shuffle.

The dynamic occupancy mechanism of unified memory management is shown in the following figure:

Spark uses a unified memory management mechanism to improve the utilization of in-heap and off-heap memory resources, reducing the difficulty of maintaining Spark memory. Too much storage space or too much cached data can lead to frequent full garbage collection, reducing performance at task execution because cached RDD data is usually in long-term mainstream memory. Therefore, to make full use of Spark’s performance, developers need to understand the management modes and implementation principles of storage memory and execution memory.

Storage Memory Management

1. RDD Persistence mechanism

An elastic distributed Data set (RDD) is the most fundamental data abstraction of Spark. It is a set of read-only Partition records. An RDD can only be created on a data set in a stable physical storage or a new RDD can be generated by performing a Transformation on an existing RDD. The converted RDD was dependent on the original RDD, and Lineage was formed. With pedigree Spark ensures that every RDD can be restored. However, all transformations in RDD are lazy, that is, only when an Action that returns a result to the Driver occurs does Spark create a task to read the RDD and then actually trigger the execution of the transformation.

When a Task reads a partition when it is started, it checks whether the partition has been persisted. If not, it needs to check Checkpoint or recalculate the partition based on ancestry. So if you want to perform multiple actions on an RDD, you can persist or cache the RDD in memory or disk using the persist or cache methods in the first action to speed up subsequent actions.

In fact, the cache method persists RDD to memory using the default MEMORY_ONLY storage level, so caching is a special kind of persistence. The design of in-heap and off-heap storage memory allows unified planning and management of the memory used for caching RDD.

The Storage module of Spark is responsible for the persistence of the RDD, which decouple the RDD from the physical Storage. The Storage module manages the data generated during Spark calculation and encapsulates the data access functions in the memory or disk, locally or remotely. Storage modules on the Driver and Executor form a master-slave architecture. That is, the BlockManager on the Driver is Master, and the BlockManager on the Executor is Slave.

Storage module logically uses blocks as the basic Storage unit. Each Partition of an RDD corresponds to a Block after processing (BlockId format is RDD_RDD-ID_partition -ID). The Master on the Driver is responsible for managing and maintaining the metadata of blocks in the Spark application. The Slave on the Executor receives the Master’s commands, such as adding or deleting an RDD, and reports the Block update status to the Master.

For RDD persistence, Spark specifies seven different storage levels, such as MEMORY_ONLY and MEMORY_AND_DISK. Storage levels are combinations of the following five variables:

class StorageLevel private(

Private var _useDisk: Boolean, // Disk

Private var _useMemory: Boolean, // this means heap memory

Private var _useOffHeap: Boolean, // Out-of-heap memory

Private var _deserialized: Boolean, // Whether deserialized

Private var _replication: Int = 1 // Number of copies

)

Storage levels in Spark 7 are as follows:

Based on the analysis of data structure, it can be seen that the storage level defines the storage mode of Partition (also known as Block) of RDD from three dimensions:

(1) Storage location: disk/in-heap memory/out-of-heap memory. MEMORY_AND_DISK, for example, is stored on both disk and heap memory for redundancy. OFF_HEAP is only stored in off-heap memory. Currently, off-heap memory cannot be stored in other locations at the same time.

(2) Storage form: after the Block is cached to the storage memory, whether it is in the form of non-serialization. For example, MEMORY_ONLY is unserialized and OFF_HEAP is serialized.

(3) Number of copies: If the number of copies is greater than 1, remote backup to other nodes is required. For example, DISK_ONLY_2 requires a remote backup copy.

2. Caching process of RDD

Before an RDD is cached into storage, data in a Partition is usually accessed as an Iterator, which is a method of traversing a data set in Scala. The Iterator can retrieve each serialized or unserialized Record in the Partition. The object instance of the Record logically occupies the other portion of the JVM heap memory. The storage space of different records in the same Partition is not contiguous.

After the RDD is cached in the storage memory, the Partition is converted to a Block, and the Record occupies a contiguous space in the in-heap or off-heap storage memory. The process of converting a Partition from a discontinuous storage space to a continuous storage space is called Unroll by Spark.

Blocks can be serialized or unserialized, depending on the storage level of the RDD. A non-serialized Block is defined as a data structure of DeserializedMemoryEntry, which stores all object instances in an array. A serialized Block is defined as a data structure of SerializedMemoryEntry. Byte buffers are used to store binary data. Each Executor’s Storage module uses a LinkedHashMap to manage all instances of Block objects in and out of the heap. Adding and removing linkedhashmaps to and from the Storage indirectly records the acquisition and release of memory.

Since there is no guarantee that storage space will hold all of the data in Iterator at once, the current computing task needs to apply to the MemoryManager for sufficient Unroll space to temporarily occupy space. If there is insufficient Unroll space, the task fails to Unroll and can continue when there is enough space.

For serialized partitions, the required Unroll space can be directly accumulated and applied once.

For non-serialized partitions, one application should be made in the process of facilitating Record, that is, every time a Record is read, the sampling is used to estimate the Unroll space required and the application is made. When the space is insufficient, it can be interrupted to release the occupied Unroll space.

If the Unroll succeeds, the Unroll space occupied by the current Partition is converted to the normal storage space of the cache RDD, as shown in the following figure.

In static memory management, Spark allocates a fixed Unroll space in the storage memory. In unified memory management, Spark does not differentiate Unroll space. When the storage space is insufficient, Spark processes the Unroll space based on the dynamic memory usage mechanism.

3, elimination and falling disk

Because all computing tasks of the same Executor share limited storage memory space, old blocks in the LinkedHashMap are Eviction when new blocks need to be cached and cannot be used dynamically. If the storage level of a discarded Block contains a requirement to store the Block to a disk, Drop the Block. Otherwise, delete the Block directly.

The rules of memory elimination are as follows:

The old Block to be eliminated must have the same MemoryMode as the new Block, that is, both belong to off-heap or in-heap memory.

Old and new blocks cannot belong to the same RDD to avoid cyclic elimination.

The RDD to which the old Block belongs cannot be read to avoid consistency problems.

The blocks in the LinkedHashMap are iterated through in order of least recently used (LRU) until the space required for the new Block is satisfied. LRU is a feature of LinkedHashMap.

The process of falling disk is relatively simple. If its Storage level meets the condition that _useDisk is true, it will judge whether it is non-serialized according to its _deserialized. If so, it will be serialized, and finally store the data to disk and update its information in the Storage module.

Perform memory management

Shuffle is a process of repartitioning RDD data based on certain rules. The Write and Read phases of Shuffle use the execution memory.

Shuffle Write

ExternalSorter is used on the Map side to externalize data, which mainly occupies execution space in the heap when data is stored in memory.

Shuffle Read

(1) When aggregating data on the Reduce end, the data should be delivered to the Aggregator for processing, and the execution space in the heap will be occupied when the data is stored in the memory.

(2) If the final results need to be sorted, the data will be handed over to ExternalSorter for processing again, occupying the execution space in the heap.

In ExternalSorter and Aggregator, Spark uses a hash table called AppendOnlyMap to store data in the heap execution memory. However, all data cannot be stored in the hash table during Shuffle. When the memory occupied by the hash table is periodically sampled and estimated, and the hash table becomes too large to be used for new execution memory from MemoryManager, Spark stores the contents of the hash table to a disk file, a process called Spill. Files that overflow to disk are eventually merged.

Spark’s storage memory and execution memory are managed in different ways. For storage memory, Spark uses a LinkedHashMap to centrally manage all blocks, which are converted into partitions of the RDD that need to be cached. As for execution memory, Spark uses AppendOnlyMap to store data during Shuffle and even abstract page memory management in Tungsten sorting, opening up a new JVM memory management mechanism.

9. Analysis of Spark core components

BlockManager Data storage and management mechanism

The BlockManager is a component of Spark responsible for data storage and management. All data of drivers and executors is managed by the corresponding BlockManager.

The Driver has the BlockManagerMaster, which is responsible for maintaining metadata of data internally managed by the BlockManager on each node. For example, metadata changes are maintained for adding, deleting, or modifying blocks.

Each node has a BlockManager. After each BlockManager is created, the first thing to do is to register with the BlockManagerMaster. At this time, the BlockManagerMaster creates the corresponding BlockManagerInfo.

The relationship between BlockManagerMaster and BlockManager is similar to that between NameNode and DataNode. The BlockManagerMaster stores metadata of internal managed data of BlockManager for maintenance. When BlockManager adds, deletes, or modifies blocks, metadata is changed in BlockManagerMaster. NameNode maintains DataNode metadata. When data in DataNode changes, metadata in NameNode also changes accordingly.

Each node has a BlockManager, which has three very important components:

DisStore: reads and writes data on disks.

MemoryStore: Reads and writes memory data.

BlockTransferService: establishes the connection between BlockManager and BlockManager of other remote nodes, and reads and writes data from BlockManager of other remote nodes.

After each BlockManager is created, the first thing it does is register with the BlockManagerMaster, which creates the corresponding BlockManagerInfo for it.

When BlockManager is used for write operations, for example, some intermediate data in the RDD running process, or if persist() is manually specified, the data will be written to memory first. If the memory size is insufficient, the data will be written to disk using our own algorithm. In addition, if persist() specifies replica, the BlockTransferService will be used to replicate a copy of the data to the BlockManager of another node.

When you use BlockManager to read, for example, shuffleRead, if you can read from the local, use DisStore or MemoryStore to read from the local, but there is no local data. Then the BlockTransferService is used to establish a connection with the BlockManager that has data, and then the BlockTransferService is used to read data from the remote BlockManager. For example, in the shuffle Read operation, the data to be pulled may not be available locally. In this case, the BlockManager of the remote node with data is asked to pull the required data.

When adding, deleting, or modifying data using BlockManager, the BlockStatus of a Block must be reported to the BlockManagerMaster. In BlockManagerMaster, you can add, delete, or change the BlockStatus inside the BlockManagerInfo of the specified BlockManager to maintain metadata.

Underlying implementation of Spark shared variables

A very important feature of Spark is shared variables.

By default, if an external variable is used in a function of an operator, the value of the variable is copied to each task, and each task can only operate on its own copy of the variable. If multiple tasks want to share a variable, this approach is not possible.

Spark provides two shared variables, Broadcast Variable and Accumulator. Broadcast Variable is used to copy only one Variable for each node, that is, one copy for each Executor. It is more useful to optimize performance, for example, network transmission and memory consumption. Accumulator allows multiple tasks to operate on a single variable. Broadcast Variable is a shared read Variable that task cannot modify, and Accumulator allows multiple tasks to manipulate a Variable.

Radio variable

Broadcast variables allow programmers to force read-only variables of external data on each Executor instead of sending a copy to each task.

Each task keeps a copy of the external variables it uses. When multiple tasks on an Executor use an external variable, it is very expensive for Executor memory. Therefore, we can encapsulate large external variables as broadcast variables, in which case each Executor keeps a copy of the variable. This variable is shared by all tasks on this Executor, instead of each task having a separate copy, which reduces the memory footprint of Spark tasks to some extent.

Using external variables

Using broadcast variables

Spark also tries to use efficient broadcast algorithms to distribute broadcast variables to reduce communication costs.

The Broadcast Variable provided by Spark is read-only and contains only one copy for each Executor rather than one copy for each task. Therefore, the Broadcast Variable reduces the network transmission consumption of variables to nodes and memory consumption on nodes. Spark also uses an efficient broadcast algorithm to reduce network consumption.

You can create broadcast variables for each variable by calling the broadcast() method of SparkContext. Then within the operator’s function, each Executor makes only one copy of the broadcast variable, and each task can use the broadcast variable’s value() method to retrieve the value.

While the task is running, Executor does not retrieve broadcast variables. When the task executes code that uses broadcast variables, it requests broadcast variables from Executor memory, as shown in the following figure:

Executor then pulls the broadcast variable from the Driver via BlockManager and provides it to Task for use, as shown in the following figure:

Broadcast large variables is a basic optimization method commonly used in Spark to improve task execution performance by reducing memory usage.

accumulator

Accumulator: An accumulator is a variable that is only accumulated by related operations and therefore can be effectively supported in parallel. They can be used to implement counters (such as MapReduce) or summation counts.

The Task running on the cluster accumulates the Accumulator value on the Driver and then sends the Accumulator value to the Driver. (Spark UI is created when SparkContext is created. Accumulator is an Accumulator that can be read from the node on the Driver.

Spark provides Accumulator for multiple nodes to share a variable. Accumulator only provides the function of accumulation, but provides the function that multiple tasks can operate on the same variable in parallel. However, Task can only accumulate Accumulator and cannot read its value. Only the Driver program can read the value of Accumulator.

Accumulator’s underlying principle is as follows:

Follow the public account: Java big data and data warehouse, learn big data technology.