Blog address: joey771.cn/2018/10/25/…

How Spark works is a common question in big data job interviews. When I was first asked this question, I was a little confused. How should I answer such a big question? Do you want to describe the architectural composition of Spark or the underlying call details? Later, AFTER searching some materials and reading some books, I have some understanding of this question. In fact, the person who asked this question probably wants us to answer the details of the operation process of Spark. In short, it describes the steps that a Spark program goes through from submission to completion. If you include some details of the underlying Spark source code in your description, you will give the interviewer the impression that you are not only using Spark, but also have some understanding of the underlying source code.

This section describes the operation principle of Spark

After the user submits a job using spark-submit, a driver process is started. The driver process applies to the cluster manager (YARN, Mesos) for the resources (such as core and memory) required for the run. This can be set in spark-Submit’s parameters), and the cluster manager starts executor on each node based on the parameters we need. After the resource is requested, the driver process starts scheduling and executing the job code we wrote. The job is submitted to DAGScheduler, which will divide the job into multiple stages according to the RDD dependencies in the job. The principle of splitting is that each stage contains as many consecutive narrow dependencies as possible according to whether wide dependencies occur. Each stage contains part of a job that generates a TaskSet and submits it to the underlying scheduler, the TaskScheduler, which submits the TaskSet to the cluster for execution by the executor. Tasks are divided based on data partitions. Each partition is divided into a Task. The loop repeats until all the code logic of the driver program you wrote has been executed and all the data has been computed.

The simple operation process is shown as follows:

Figure 1 Spark running process

SparkContext

The Spark Driver is the most important part of the Spark Driver. SparkContext is initialized to prepare the running environment of the Spark application. SparkContext communicates with the cluster, applies for resources to the cluster manager, and allocates and monitors tasks.

The structure between driver and worker is shown in the following figure. Driver is responsible for distributing tasks to worker, and worker returns the processed results to driver.

Figure 2 driver architecture

The core function of SparkContext is to initialize the core components required by Spark application running, including the high-level scheduler DAGScheduler, low-level scheduler TaskScheduler, and SchedulerBackend. It is also responsible for registering Spark programs with the Master. The RDD in Spark is created by SparkContext, for example, using apis such as sparkContext.textFile () and sparkContext.parallel (). The application for computing resources from the Cluster Manager mentioned in the running process is also applied by objects generated by SparkContext. Now let’s look at SparkContext from a source point of view, about the various components created by SparkContext. In the SparkContext class there is code to create these components:

DAGScheduler

DAGScheduler is a high-level scheduler that can divide each RDD of DAG into different stages, build parent-child relationship between these stages, and finally divide each Stage into multiple tasks according to partition. It is submitted to the underlying scheduler TaskScheduler in the form of a TaskSet. Stage is divided according to whether there is a wide dependency in the RDD dependency relationship. The wide dependency refers to the dependence of one partition in the parent RDD on multiple partitions in the RDD. Simply speaking, the output degree of the partition in the parent RDD is greater than 1. Narrow dependence means that only one partition of the parent RDD depends on one partition of the parent RDD. In other words, the output degree of all the partitions of the parent RDD is 1. Each Stage will contain as many narrow dependencies as possible. The operation of each narrow dependency operator into a whole pipeline can Reduce RDD read and write between operators, unlike MapReduce in which each job contains only one Map task and one Reduce task. The next Map task can be executed only after all the previous Reduce tasks are completed. Shuffle is not generated during pipeline execution. Therefore, it is more efficient to execute the Map task together. There is a shuffle between stages. Shuffle is also a point that is often examined. DAGScheduler also needs to record materialized actions, such as which RDD is stored on disk, while seeking optimal scheduling of tasks, such as localizability of data within stages. DAGScheduler also needs to monitor possible failures due to shuffle output across nodes. If the Stage is found to fail, it may be resubmitted.

DAGScheduler specifies the call process

When a job is submitted, DAGScheduler starts its work. The submission of a job in Spark is triggered by the ACTION of the RDD. When an action occurs, the action method of the RDD calls the runJob method of its SparkContext. The runJob method of DAGScheduler is called after multiple reloads. In the DAGScheduler class, runJob is the entry function for submitting jobs. The submitJob method is called to return a JobWaiter to wait for the result of job scheduling, and then the relevant result log information is printed according to the success or failure of the job.


The submitJob method retrieves jobId and verifies the existence of partitions, and sends a Case Class JobSubmitted object to eventProcessLoop. The JobSubmitted object encapsulates jobId, the last RDD, Functions for RDD operations and which partitions need to be calculated. The eventProcessLoop contains an eventThread, which is a deamon thread, which receives the JobSubmitted object sent to the thread via post and puts it into one of the eventQueue blocking queues for processing. Events taken from the eventQueue call the onReceive method (implemented by eventProcessLoop), which in turn calls the doOnReceive method and treats it differently depending on the event type. When DAGScheduler calls submitJob, doOnReceive is used to process the job. If the DAGScheduler calls submitJob, doOnReceive is used to process the job. And send itself a message for processing (eventProcessLoop is an object inside DAGScheduler). The submitJob method returns immediately and does not block in the process of processing the event. In fact, there are other components that send messages to DAGScheduler. This way of message processing using a daemon thread can unify the two. The logic of both processing is consistent and has good scalability. All messages can be processed in a unified manner, ensuring that the business logic for processing is consistent. EventProcessLoop here can actually process multiple messages, not only JobSubmitted. You can see the following event processing in the source code:

  1. JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
  2. MapStageSubmitted(jobId, dependency, callSite, listener, properties)
  3. StageCancelled(stageId, reason)
  4. JobCancelled(jobId, reason)
  5. JobGroupCancelled(groupId)
  6. AllJobsCancelled
  7. ExecutorAdded(execId, host)
  8. ExecutorLost(execId, reason)
  9. WorkerRemoved(workerId, host, message)
  10. BeginEvent(task, taskInfo)
  11. SpeculativeTaskSubmitted(task)
  12. GettingResultEvent(taskInfo)
  13. completion
  14. TaskSetFailed(taskSet, reason, exception)
  15. ResubmitFailedStages

The handleJobSubmitted method in DAGScheduler will be called by JobSubmitted. This method is the starting Stage of building the Stage and will create the last Stage in the Stage — ResultStage, The other stages are shufflemapstages. The creation of a ResultStage is accomplished by the createResultStage function, where the getOrCreateParentStage method obtains or creates a list of parent Stages for a given RDD. The source code for this method is simple as follows:

Call getShuffleDependencies to return direct Shuffle dependencies on the parent of a given RDD.

There are three main data structures, two HashSets — Parents and Visited — and a Stack — waitingForVisit. The code first adds the incoming RDD to waitingForVisit, which is used for Stack access. We can also see that this is a depth-first search strategy by using stack here. Visited records the nodes visited to ensure that they will not be visited repeatedly. Next, it makes a distinction between the dependencies of RDD visited. In the case of dependency, the dependent RDD is added to waitingForVisit for a depth-first search traversal, which eventually returns to parents, resulting in shuffledePs recorded in parents, i.e., dependencies between stages. After according to shuffle of dependency to invoke ShuffleMapStage getOrCreateShuffleMapStage, The resulting ShuffleMapStage is stored in the shuffleIdToMapStage HashMap. If the ShuffleMapStage already exists in the data structure, the ShuffleMapStage is returned. Does not exist, it calls to create createShuffleMapStage, created is called getMissingAncestorShuffleDependencies to search ancestors shuffle the dependency, to rely on the Stage of creation. After the Stage is created, handleJobSubmitted will call submitStage to submit the finalStage. SubmitStage will recursively submit the parent Stage first. The parent stages are obtained by getMissingParentStages and sorted by Stage ID. Stages with a smaller ID are submitted first.

Specific examples

The figure below shows the transformation diagram of 5 RDD. Assuming that RDD E finally starts an action (such as COLLECT), the generation process of DAGScheduler to Stage will be explained in detail according to the relationship in the figure.

  • RDD. Collect method will start sparkContext. runJob method, then call DAGScheduler. RunJob method, and then call submitJob method to encapsulate this event into JobSubmitted event for processing. Call to handleJobSubmitted, where createResultStage is called.
  • CreateResultStage creates a ResultStage based on jobId (the RDD in the ResultStage is the RDD from which the action originated, or finalRDD). Call getOrCreateResultStages to create all parent stages, return parents: List[Stage] as the parent Stage, pass parents to ResultStage, instantiate and generate ResultStage. In the diagram, RDD E calls createResultStage, obtains Stage1 and Stage2 from getOrCreateResultStages, and creates its own Stage3.
  • GetShuffleDependencies of getOrCreateParentStages method will get all the paragraph directly depend on the collection of RDD E RDD B and RDD D, then the two RDD call getOrCreateShuffleMapStage respectively, Because they did not father Stage, these two RDD getMissingAncestorShuffleDependencies will return null, creates the two ShuffleMapStage, finally, will the two stages as the father of Stage3 Stage, Create Stage3.
  • Then the submitStage in handleJobSubmitted will be called to submit the Stage. During the submission, the previous Stage will be submitted first and the smaller Stage id will be submitted according to the ID of the Stage. The later Stage depends on the previous Stage, and the later Stage will be calculated only after the previous Stage has been calculated.

SchedulerBackend and TaskScheduler

As mentioned above, TaskScheduler and SchedulerBackend are just traits. The specific implementation class of TaskScheduler is TaskSchedulerImpl, while the subclasses of SchedulerBackend include:

  1. LocalSchedulerBackend
  2. StandaloneSchedulerBackend
  3. CoarseGrainedSchedulerBackend
  4. MesosCoarseGrainedSchedulerBackend
  5. YarnSchedulerBackend

Different SchedulerBackend has different Spark operating modes. Different master arguments passed to createTaskScheduler will output different SchedulerBackend. In this case spark actually generates different SchedulerBackend based on the string passed by the master. The policy mode in design mode is used to create different subclasses of SchedulerBackend according to different needs. If the local mode is used, LocalSchedulerBackend is created. The standalone cluster pattern will create StandaloneSchedulerBackend. StandaloneSchedulerBackend there is an important method in the start, first will call his father class start method, then defines a Command objects Command, Members of one object mainClass is org. Apache. Spark. Executor. CoarseGrainedExecutorBackend, this class is very important, We ran the spark application will see on worker node name for CoarseGrainedExecutorBackend JVM process, this process can be understood as executor process, The master sends commands to the worker to start the executor all processes when loading the Main method of the entrance of the class is the CoarseGrainedExecutorBackend, Start the executor, in CoarseGrainedExecutorBackend executor by building a thread pool to execute concurrently task, and then calls the run method. Also created in the start method is a very important object, StandaloneAppClient, whose start method is called, where a ClientEndpoint object is created, which is an RpcEndPoint registered with the Master.

SchedulerBackend is actually managed by TaskScheduler. In createTaskScheduler, the Initialize method of TaskScheduler is called to input SchedulerBackend as a parameter. Bind the relationship between them.

The Initialize method also creates a Pool to initially define the Scheduling Mode for the resource. The default Mode is FIFO, and a supported Mode is FAIR. FIFO mode means that the task submitted first will be executed first, and the later task needs to wait for the earlier task to be executed. FAIR mode supports task grouping in the scheduling pool. Different scheduling pools have different weights, and tasks can be executed in order according to the weight.

The core task of the TaskScheduler is to submit a TaskSet to a cluster operation and report the results. As we know, the DAGScheduler will divide tasks into a series of stages, and each Stage will encapsulate a TaskSet. These tasksets will be submitted to the underlying scheduler TaskScheduler for execution in sequence. TaskScheduler received TaskSet is passed by the method of the submitMissingTasks DAGScheduler, specific called function for TaskScheduler. SubmitTasks, The TaskScheduler initializes a TaskSetManager for each TaskSet to manage its lifecycle. When the TaskScheduler gets an Executor compute resource on the Worker node, The TaskSetManager sends the specific Task to the Executor for execution. If a Task fails during execution, the TaskSetManager will notify DAGScheduler to terminate the Task and add the failed Task to the queue for further calculation. The default retry times is 4. The method to process the logic is TaskSetManager handleFailedTask. After the Task is executed, the TaskSetManager sends the result back to DAGScheduler for subsequent processing.

TaskScheduler implements a scheduler class called TaskSchedulerImpl, and submitTasks is an important method.

This method creates the TaskSetManager and manages the STAGE’s ID with the TaskSetManager using a HashMap. The SchedulableBuilder addTaskSetManager method is then called to add the created TaskSetManager to it, The SchedulableBuilder determines the scheduling order of TaskSetManager and which ExecutorBackend each Task runs on. SubmitTasks method is finally invokes the backend receiveOffer, the backend specific type for CoarseGrainedSchedulerBackend commonly, is a subclass of SchedulerBackend, The Driverendpoint. send method is invoked in the reviveOffers method, which sends a ReceiveOffers message to driverEndPoint, triggering the underlying resource scheduling. When the Receive method of driverEndPoint matches the ReceiveOffers message, the makeOffers method is called as follows:

This method takes the activeExecutor and generates all the workOffers available for computation based on the activeExecutor. The workOffers are passed in when they are created, including the Executor id, host, and available core. The available memory information has already been obtained elsewhere. The makeOffers method also calls scheduler’s resourceOffers method, which provides resources for the workOffers used for computation and evenly distributes tasks to each workOffer(Executor) for computation. I had a question here about whether the tasks are distributed sequentially to each Executor for calculation, i.e. if there are 100 tasks and 5 executors, will the tasks always be distributed according to Executor 0, Executor 1, Executor 2… Executor 0 will always get tasks with ID TaskId % 5 and Executor 1 will always get tasks with ID TaskId % 5 + 1. After reading the source code, it can be found that Random. Shuffle (Offers) is called, that is, shuffling the order of workOffers(Executors) in Seq, so as to avoid always putting tasks on the same group of worker nodes. That we in the subsequent resourceOfferSingleTaskSet method can clearly see the specific task distribution in the order of the process is in accordance with the workerOffers in Seq, A simple for traverse of workerOffers reads the available core resources and distributes the available resources to the TaskSetManager for the computation of the corresponding TaskSet:

ShuffledOffers is shuffledOffers and shuffledOffers are shuffledOffers and shuffledOffers are shuffledOffers. ShuffledOffers are shuffledOffers and shuffledOffers are shuffledOffers. ShuffledOffers are shuffledOffers and shuffledOffers are shuffledOffers. Instead, tasks are assigned to workerOffer in random, out-of-order order. However, tasks are assigned to workerOffer locally, and the corresponding Executor resource is entered into the resourceOffer method of The TaskSetManager. This method returns the TaskDescription of the Task to be computed, and it is important to try to assign executors tasks that are computationally local. The local priority of data in descending order is: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY, so for some tasks, the data will always be on a node, and the task will always be assigned to the Executor on that node for calculation. The effect of shuffling the workerOffer before might not seem particularly obvious, as some tasks were always evaluated on certain nodes. DAGScheduler also has local consideration, but DAGScheduler is considered from the data level, which can be determined from the RDD level, while TaskScheduler is considered from the perspective of specific computation, which is a more specific underlying scheduling, which meets the requirements of data localization and computation localization.

We see in the resourceOfferSingleTaskSet method have a variable CPUS_PER_TASK, before I have been understanding is a Task is performed by a CPU core, but the variable actually comes from spark. The configuration parameters of Task. Cpus, When this parameter is set to 2, a Task will be allocated 2 cores. We know from Stack Overflow that this parameter is actually set to meet the needs of some special tasks. Some tasks may have multi-threading or start extra threads for other interactive operations. This setting ensures that the total demand on core resources does not exceed the specified value when the Task is run as specified (however, it is not mandatory. If the number of threads started by the Task is greater than the specified spark.task.cpus, this does not cause any problem, but it may cause resource preemption when the number exceeds the specified value, affecting the efficiency).

Tasksets that allocate resources are actually in some order, In TaskSchedulerImpl. ResourceOffers method calls the rootPool. GetSortedTaskSetQueue get sorted according to certain rules of traverse the taskSet processing, the rule is the FIFO or FAIR said before, Refers to the calculated priority of a TaskSet that belongs to a Stage. The resourceOffers function also initially marks each slave that is alive, logging its host name and keeping track of whether new executors have been added, where it is possible that some executors have hung up and restarted, You need to add a new TaskSet computation request to the computing resource information record.

Task allocation of resources, will get a Task TaskDescription CoarseGrainedSchedulerBackend launchTasks calling the next Task is sent to the corresponding ExecutorBackend execution.

If the Task’s serialized size exceeds maxRpcMessageSize(default: 128M), it will be discarded, otherwise it will fetch executorData from the Task’s executorId recorded in the TaskDescription. Subtract the freeCores from the number of cores required to run the task, and use executorEndPoint’s send method to send LaunchTask to the specified Executor’s ExecutorBackend for execution. LaunchTask is a case class that stores serialized tasks.

References:

  • Spark Big data Business Trilogy/Jialin Wang, Zhihua Duan and Yang Xia. Beijing: Tsinghua University Press, 2018
  • Stackoverflow.com/questions/3…