1, the preface

We already know the important role of Spark, Driver startup, namely DriverWrapper

Let’s go back to the cluster submission process

  1. After the task is submitted, the Master (resource manager) finds a Worker (node) to start the Driver process, which registers the application with the Master
  2. The Master enables the corresponding Worker according to the resource configuration of the Submit script
  3. Start all executors on the Worker and the Executor reverts to the Driver for registration
  4. The Driver starts executing the main function, and then when executing the Action operator, it divides stages. Each stage generates a taskSet, and then distributes tasks to each Executor

From the submission process, we know that we have only started the Driver, and the subsequent registration operation has not been completed. We will analyze the subsequent process from the perspective of SparkContext source code

2. Prepare SparkContext Process 01

Starting the Driver means that our code will be executed. Readers who have written Spark programs will be familiar with the following two pieces of code: create SparkContext. Now we will trace the source code of the creation of SparkContext, and Spark will see what services are prepared for us in SparkContext

val conf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc = new SparkContext(conf)
Copy the code

Only one SparkContext may be active per JVM. SparkContext can Only have one active SparkContext per JVM. It can be understood that a Spark program has only one SparkContext

Now if we look at the 24 properties of SparkContext, we can see that SparkContext encapsulates a number of properties, including a core service called SparkEnv that runs through the entire computing framework, DAGScheduler, which you can see in other Spark articles for the Stage task division service, is wrapped in SparkContext

Let’s pick a few key points here

// The core service of SparkEnv computing layer runs through the entire application computing.
private var _env: SparkEnv = _
// Internal hold with DriverEndpoint and ClientEndpoint
private var _schedulerBackend: SchedulerBackend = _
SchedulerBackend contains a reference to SchedulerBackend
private var _taskScheduler: TaskScheduler = _
// Core class of DAGScheduler Stage partition
@volatile private var _dagScheduler: DAGScheduler = _
Copy the code

3. Prepare SparkContext process 02

SchedulerBackend and TaskScheduler are initialized. At line 492 of SparkContext, two properties are assigned sched and ts, respectively. Is through SparkContext createTaskScheduler () method, and then call _taskScheduler. The start (), start TaskScheduler service

. Below we see SparkContext createTaskScheduler () method, because the code length is too long, here I thumbnail code, continuation of an article on the Spark source code parsing 03 – Submit the submission process and the content of the Driver to start the process, We are using cluster mode, and the submitted URL is Spark ://127.0.0.1, matching the type of the fourth case in the screenshot

Let’s simplify the code flow

val scheduler = new TaskSchedulerImpl(sc)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
Copy the code

Enters the scheduler. The initialize (backend) method, we see the backend TaskSchedulerImpl attributes assigned to StandaloneSchedulerBackend

Can know from the returned tuple2, ts is assigned TaskSchedulerImpl, sched assigned StandaloneSchedulerBackend and internal hold TaskSchedulerImpl references, we use the figure will we know the properties of the summary

4. Prepare SparkContext process 03

Finish see SparkContext. CreateTaskScheduler (), we continue to see _taskScheduler. Start (), start TaskScheduler service, here we ignore details, just need to know the start () method, Main call * * backend. Start () method, * * the backend here is what we in constructing TaskScheduler. The initialize () of the incoming StandaloneSchedulerBackend, both: TaskScheduler. Start () is actually invoked the StandaloneSchedulerBackend. Start () method

Enters StandaloneSchedulerBackend. Start () method, and call the super. The start ()

We can know from the screenshot, StandaloneSchedulerBackend parent class is CoarseGrainedSchedulerBackend, both: StandaloneSchedulerBackend. Start () is actually call the father class CoarseGrainedSchedulerBackend start () method

Below we enter CoarseGrainedSchedulerBackend again. Start () method, the side has a familiar methods rpcEnv. SetupEndpoint () method, DriverEndpoint is injected into the RpcEnv environment, and onStart() of DriverEndpoint is executed by the background thread

driverEndpoint = createDriverEndpointRef(properties)

rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
Copy the code

As readers of the resource layer article know, an Endpoint is an Endpoint of Rpc communication, and an Endpoint that holds a role can communicate with that role

Let’s patch up the SparkContext property graph above

5. Prepare SparkContext Process 04

We continue to go back to StandaloneSchedulerBackend. Start (), see StandaloneSchedulerBackend class 114 lines of code, we simplify the code below logic

Prepare DriverEndpoint / /
super.start()

// Encapsulate the required parameters.val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
      args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)

// Encapsulate application information
val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
/ / client assignment
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
Prepare ClientEndpoint / /
client.start()
Copy the code

Now that we’ve looked at the super.start() method, let’s move on and pass in command when we build the StandaloneAppClient. Here we remember org. Apache. Spark. Executor. CoarseGrainedExecutorBackend was introduced to StandaloneAppClient can

Continuing with the client.start() method, line 276 of the StandaloneAppClient class, ClientEndpoint is injected into the RpcEnv environment, and ClientEndpoint’s onStart() is executed by the background thread

rpcEnv.setupEndpoint("AppClient".new ClientEndpoint(rpcEnv))
Copy the code

Let’s patch up the SparkContext property graph above

6. Prepare SparkContext Process 05

StandaloneAppClient holds a ClientEndpoint and its onStart() is invoked by a background thread. Line 86 of StandaloneAppClient calls the registerWithMaster() method

In the registerWithMaster() method, the tryRegisterAllMasters() method is called

After entering tryRegisterAllMasters(), we see lines 106 and 107 of the code, get the reference to the master, and send the RegisterApplication message to the master. This is not consistent with our submission process: the Driver starts and registers with the master

// Get the master endpoint reference
val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
// Send a RegisterApplication message to the master to register the application
masterRef.send(RegisterApplication(appDescription, self))
Copy the code

So now that we’re done with preparing SparkContext, SparkContext prepared StandaloneSchedulerBackend is in the process of preparation in order to build communication with the Master and send registration information to the Master (RegisterApplication), We also know that SparkContext encapsulates many properties, such as ShuffleManager, DAGScheduler, etc

7,

When the Driver runs our main method, SparkContext is prepared. During this process, SparkContext also prepares the communication environment and sends a RegisterApplication message to the Master. This also satisfies the submission process: The steps that a Driver takes to register with the Master after it is started.