Spark submits a calculation by calling spark-submit. Spark-submit calls the spark-submit script in the bin directory. We open the spark-submit script.

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

You can see that spark-submit executes the bin/spark-class file.

CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"

Let’s add a line to print:

. CMD=("${CMD[@]:0:$LAST}") echo "${CMD[@]}" exec "${CMD[@]}"

We submit a calculation:

spark-submit \ --class org.apache.spark.examples.SparkPi\ --master yarn \ --deploy-mode client \ --driver-memory 512m \ - the executor - 512 MB of memory \ - total - executor - cores 2 \ / home/hadoop/examples/jars/spark - examples_2. 12-3.0.0. Jar

The printed code:

java -cp ...... -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn --deploy-mode client --conf spark.driver.memory=512m --class MySpark - executor - 512 MB of memory - total - executor - cores 2 / / home/hadoop/examples/jars/spark - examples_2. 12-3.0.0. Jar

Java starts a SparkSubmit process that executes the SparkSubmit main method.

override def main(args: Array[String]): Unit = {
    val submit = new SparkSubmit() {
      self =>

      override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
        new SparkSubmitArguments(args) {
          override protected def logInfo(msg: => String): Unit = self.logInfo(msg)

          override protected def logWarning(msg: => String): Unit = self.logWarning(msg)

          override protected def logError(msg: => String): Unit = self.logError(msg)
        }
      }

      override protected def logInfo(msg: => String): Unit = printMessage(msg)

      override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg")

      override protected def logError(msg: => String): Unit = printMessage(s"Error: $msg")

      override def doSubmit(args: Array[String]): Unit = {
        try {
          super.doSubmit(args)
        } catch {
          case e: SparkUserAppException =>
            exitFn(e.exitCode)
        }
      }

    }

    submit.doSubmit(args)
  }

The main method creates the SparkSubmit object and executes the doSubmit method.

def doSubmit(args: Array[String]): Unit = { // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to // be reset before the  application starts. val uninitLog = initializeLogIfNecessary(true, silent = true) val appArgs = parseArguments(args) if (appArgs.verbose) { logInfo(appArgs.toString) } appArgs.action match { case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog) case SparkSubmitAction.KILL => kill(appArgs) case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) case SparkSubmitAction.PRINT_VERSION => printVersion() } }

The doSubmit method first analyzes the parameters;

val appArgs = parseArguments(args)
protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
    new SparkSubmitArguments(args)
  }

Open the SparkSubmitArguments class;

// Set parameters from command line arguments
  parse(args.asJava)

First, the arguments are parsed from the command-line arguments, using regular expressions.

Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)");

The specific Handle method is as follows:

override protected def handle(opt: String, value: String): Boolean = { opt match { case NAME => name = value case MASTER => master = value case CLASS => mainClass = value case DEPLOY_MODE => if (value ! = "client" && value ! = "cluster") { error("--deploy-mode must be either \"client\" or \"cluster\"") } deployMode = value case NUM_EXECUTORS => numExecutors = value case TOTAL_EXECUTOR_CORES => totalExecutorCores = value case EXECUTOR_CORES => executorCores = value case EXECUTOR_MEMORY => executorMemory = value case DRIVER_MEMORY => driverMemory = value case DRIVER_CORES => driverCores = value ......

code

case MASTER =>
        master = value

The corresponding

protected final String MASTER = "--master"; // Command line code --master yarn

code

case CLASS =>
        mainClass = value

The corresponding

protected final String CLASS = "--class"; / / command line code - class org. Apache. Spark. Examples. SparkPi

code

case DEPLOY_MODE => if (value ! = "client" && value ! = "cluster") { error("--deploy-mode must be either \"client\" or \"cluster\"") } deployMode = value

The corresponding

protected final String DEPLOY_MODE = "--deploy-mode"; // Command line code --deploy-mode client

code

case DRIVER_MEMORY =>
  driverMemory = value

The corresponding

protected final String DRIVER_MEMORY = "--driver-memory"; // Command line code --driver-memory 512M

code

case EXECUTOR_MEMORY =>
--executorMemory = value

The corresponding

protected final String EXECUTOR_MEMORY = "--executor-memory"; // Command line code --executor-memory 512M

code

case TOTAL_EXECUTOR_CORES =>
  totalExecutorCores = value

The corresponding

protected final String TOTAL_EXECUTOR_CORES = "--total-executor-cores"; // Command line code --total-executor-cores 2

In this way, we have resolved all of the spark-submit submitted parameters.

The SparkSubmitArguments class takes an argument called action

var action: SparkSubmitAction = null

The default value is null and the default value is “SUBMIT”

appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
      case SparkSubmitAction.PRINT_VERSION => printVersion()
    }

I’m going to drill down into the submit method,

private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = { def doRunMain(): Unit = { if (args.proxyUser ! = null) { val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser, UserGroupInformation.getCurrentUser()) try { proxyUser.doAs(new PrivilegedExceptionAction[Unit]() { override def run(): Unit = { runMain(args, uninitLog) } }) } catch { case e: Exception => // Hadoop's AuthorizationException suppresses the exception's stack trace, which // makes the message printed to the output by the JVM not very helpful. Instead, // detect exceptions with empty stack traces here, and treat them differently. if (e.getStackTrace().length == 0) { error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}") } else { throw e } } } else { runMain(args, uninitLog) } } // In standalone cluster mode, there are two submission gateways: // (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper // (2) The new REST-based gateway introduced In Spark 1.3, The Latter is The Default Behavior of Spark 1.3, but Spark submit will fail over // to use the legacy gateway if the master endpoint turns out to be not a REST server. if (args.isStandaloneCluster && args.useRest) { try { logInfo("Running Spark using the REST application submission protocol.") doRunMain() } catch { // Fail over to use the legacy submission gateway case e: SubmitRestConnectionException => logWarning(s"Master endpoint ${args.master} was not a REST server. " + "Falling back to  legacy submission gateway instead.") args.useRest = false submit(args, false) } // In all other modes, just run the main class as prepared } else { doRunMain() } }

The submit method will first determine if the standalone cluster is present, and we are submitting the yarn cluster, which will execute the dorUnmain () method. The DorUnMain method has a method to determine whether to use the proxy user, or to execute the main program.

Dig deep into runMain(). The runMain method has an important code that prepares the submission environment;

 val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)

Class Loader,

val loader = getSubmitClassLoader(sparkConf)

Find out about the class by reflection,

mainClass = Utils.classForName(childMainClass)

Does the mainClass inherit from SparkApplication? If so, create an instance directly through the constructor. If not, create an object directly, new JavaMainApplication(mainClass).

val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
  mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
} else {
  new JavaMainApplication(mainClass)
}

Call the SparkApplication’s start method.

try {
  app.start(childArgs.toArray, sparkConf)
} catch {
  case t: Throwable =>
    throw findCause(t)
}

Let’s drill down into the PrepareSubmitEnvironment method,

Method defines four variables,

// Return values
    val childArgs = new ArrayBuffer[String]()
    val childClasspath = new ArrayBuffer[String]()
    val sparkConf = args.toSparkConf()
    var childMainClass = ""

And childMainClass is mutable, so we find where we assign childMainClass,

 if (isYarnCluster) {
      childMainClass = YARN_CLUSTER_SUBMIT_CLASS
 private[deploy] val YARN_CLUSTER_SUBMIT_CLASS =
    "org.apache.spark.deploy.yarn.YarnClusterApplication"

Under the framework of Yarn, childMainClass assignment “org. Apache. Spark. Deploy. Yarn. YarnClusterApplication”

private[spark] class YarnClusterApplication extends SparkApplication {

  override def start(args: Array[String], conf: SparkConf): Unit = {
    // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
    // so remove them from sparkConf here for yarn mode.
    conf.remove(JARS)
    conf.remove(FILES)

    new Client(new ClientArguments(args), conf, null).run()
  }
}  

Deep into the Client

private[spark] class Client( val args: ClientArguments, val sparkConf: SparkConf, val rpcEnv: RpcEnv) extends Logging { import Client._ import YarnSparkHadoopUtil._ private val yarnClient = YarnClient.createYarnClient private val hadoopConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf)) private val isClusterMode = sparkConf.get(SUBMIT_DEPLOY_MODE) == "cluster" private val isClientUnmanagedAMEnabled = sparkConf.get(YARN_UNMANAGED_AM) && ! isClusterMode private var appMaster: ApplicationMaster = _ private var stagingDirPath: Path = _

The Client creates the Client for Yarn,

private val yarnClient = YarnClient.createYarnClient



In-depth YarnClient createYarnClient,

public static YarnClient createYarnClient() {
    YarnClient client = new YarnClientImpl();
    return client;
}

CreateArnClient creates an object called yarnClientImpl (), and dives into yarnClientImpl (),

public class YarnClientImpl extends YarnClient { private static final Log LOG = LogFactory.getLog(YarnClientImpl.class);  protected ApplicationClientProtocol rmClient; .

RMClient is the client of a ResourceManager.

So let’s go back to the yarnClusterApplication, go into the Run method,

def run(): Unit = {
    this.appId = submitApplication()
......

The submitApplication() method submits the application and returns the APPID, which is the application ID for global YARN. Subsequent status, reports, and so on can be obtained from the APPID.

In-depth submitApplication (),

def submitApplication(): ApplicationId = { ResourceRequestHelper.validateResources(sparkConf) var appId: ApplicationId = null try { launcherBackend.connect() yarnClient.init(hadoopConf) yarnClient.start() logInfo("Requesting a new application from cluster with %d NodeManagers" .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers)) // Get a new application from our RM val newApp = yarnClient.createApplication() val newAppResponse = newApp.getNewApplicationResponse() appId = newAppResponse.getApplicationId() // The app staging dir based on the STAGING_DIR configuration if configured // otherwise based on the users home directory. val appStagingBaseDir = sparkConf.get(STAGING_DIR) .map { new Path(_, UserGroupInformation.getCurrentUser.getShortUserName) } .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory()) stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId)) new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT), Option(appId.toString)).setCurrentContext() // Verify whether the cluster has enough resources for our AM verifyClusterResources(newAppResponse) // Set up the appropriate contexts to launch our AM val containerContext = createContainerLaunchContext(newAppResponse) val appContext = createApplicationSubmissionContext(newApp, containerContext) // Finally, submit and monitor the application logInfo(s"Submitting application $appId to ResourceManager") yarnClient.submitApplication(appContext) launcherBackend.setAppId(appId.toString) reportLauncherState(SparkAppHandle.State.SUBMITTED) ...... .

code

try {
    launcherBackend.connect()
    yarnClient.init(hadoopConf)
    yarnClient.start()

This means that the connection to Yarn is successful and the Yarn client is opened;

// Get a new application from our RM
      val newApp = yarnClient.createApplication()
      val newAppResponse = newApp.getNewApplicationResponse()
      appId = newAppResponse.getApplicationId()

Start an application from our ResourceManager and get the response ID;

// Set up the appropriate context to launch our AM val ContainerContext = createContainerLaunchContext(newAppResponse) val appContext = createApplicationSubmissionContext(newApp, containerContext)

The above code represents the launch environment and the commit environment to create the container;

Submit the application to a ResourceManager

// Finally, submit and monitor the application. Submitted and monitoring application logInfo (s "date application $appId to ResourceManager") yarnClient. SubmitApplication (appContext) launcherBackend.setAppId(appId.toString) reportLauncherState(SparkAppHandle.State.SUBMITTED)



So we’re submitting our application, so we’re submitting AppContext, and we’re submitting

val appContext = createApplicationSubmissionContext(newApp, containerContext)

Create the container context before preparing the application commit context.

val containerContext = createContainerLaunchContext(newAppResponse)

The thorough createContainerLaunchContext,

val amClass = if (isClusterMode) { Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName } else {  Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName } if (args.primaryRFile ! = null && (args.primaryRFile.endsWith(".R") || args.primaryRFile.endsWith(".r"))) { args.userArgs = ArrayBuffer(args.primaryRFile) ++ args.userArgs } val userArgs = args.userArgs.flatMap { arg => Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg)) } val amArgs = Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++ Seq("--properties-file", buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) ++ Seq("--dist-cache-conf", buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, DIST_CACHE_CONF_FILE)) // Command for the applicationMaster // appMaster is responsible for starting the Command val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++ javaOpts ++ amArgs ++ Seq( "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr") // TODO: it would be nicer to just make sure there are no null commands here val printableCommands = commands.map(s => if (s == null) "null" else s).toList amContainer.setCommands(printableCommands.asJava)

We are a cluster environment, amClass choose “org. Apache. Spark. Deploy. Yarn. ApplicationMaster”, and then assembled into amArgs start a JVM. The commands directive wraps it and puts it in a container,

// send the acl settings into YARN to control who has access via YARN interfaces
    val securityManager = new SecurityManager(sparkConf)
    amContainer.setApplicationACLs(
      YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager).asJava)
    setupSecurityToken(amContainer)
    amContainer

Finally, we return the amContainer directly.

RM will be switched in NM container AM also is in the container to perform “Java org. Apache. Spark. Deploy. Yarn. The ApplicationMaster”, start a process of AM.

We find the org. Apache. Spark. Deploy. Yarn. The main method of ApplicationMaster;

def main(args: Array[String]): Unit = { SignalUtils.registerLogger(log) val amArgs = new ApplicationMasterArguments(args) val sparkConf = new SparkConf() if (amArgs.propertiesFile ! = null) { Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) => sparkConf.set(k, v) } } ......

The main method first creates a new object

val amArgs = new ApplicationMasterArguments(args)

The object to encapsulate the command line parameters into ApplicationMasterArguments (args) objects.

val yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf)) master = new ApplicationMaster(amArgs,  sparkConf, yarnConf)

SparkConf and yarnConf are passed in to create the AppMaster object.

Drilling down into the ApplicationMaster object creates the yarnRMClient () object

private val client = new YarnRMClient()

Continue to delve into YarnRMClient;

private[spark] class YarnRMClient extends Logging {

  private var amClient: AMRMClient[ContainerRequest] = _
  private var uiHistoryAddress: String = _
  private var registered: Boolean = false

The AMRMClient is the ApplicationMaster client that connects to a ResourceManager.

Further org. Apache. Spark. Deploy. Yarn. The ApplicationMaster class.

ugi.doAs(new PrivilegedExceptionAction[Unit]() {
      override def run(): Unit = System.exit(master.run())
    })

This is where the master is going to execute the run method, so let’s drill down into the run method.

. if (isClusterMode) { runDriver() } else { runExecutorLauncher() } ......

If it’s cluster mode, run runDriver(), otherwise run RunExecutorLauncher (), we’re cluster mode, run runDriver();

private def runDriver(): Unit = {
    addAmIpFilter(None, System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV))
    userClassThread = startUserApplication()

It starts the user application thread,

val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME) try { val sc = ThreadUtils.awaitResult(sparkContextPromise.future, Duration(totalWaitTime, TimeUnit.MILLISECONDS)) if (sc ! = null) { val rpcEnv = sc.env.rpcEnv ......

Then wait for the context object; So let’s go into startUserApplication(),

val mainMethod = userClassLoader.loadClass(args.userClass)
      .getMethod("main", classOf[Array[String]])

So we’re going to use the class loader to load a class,

case ("--class") :: value :: tail =>
          userClass = value
          args = tail

UserClass “– a class” from the incoming parameters, we incoming parameters “– class org. Apache. The spark. Examples. SparkPi” is used here, find the “main” method from the specified class.

            sparkContextPromise.tryFailure(e.getCause())
        } finally {
          // Notify the thread waiting for the SparkContext, in case the application did not
          // instantiate one. This will do nothing when the user code instantiates a SparkContext
          // (with the correct master), or when the user code throws an exception (due to the
          // tryFailure above).
          sparkContextPromise.trySuccess(null)

Initialize SparkContext.

    userThread.setContextClassLoader(userClassLoader)
    userThread.setName("Driver")
    userThread.start()
    userThread

Then start the thread with the name “Driver”;



Let’s go back to RunDriver, start the thread, and the SparkContext object is initialized successfully;

val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME) try { val sc = ThreadUtils.awaitResult(sparkContextPromise.future, Duration(totalWaitTime, TimeUnit.MILLISECONDS)) if (sc ! = null) { val rpcEnv = sc.env.rpcEnv val userConf = sc.getConf val host = userConf.get(DRIVER_HOST_ADDRESS) val port = Userconf. get(DRIVER_PORT) // ApplicationMaster is registered to connect to RM. RegisterAM (host, port, userConf, sc.ui.map(_. WebURL)); appAttemptId) val driverRef = rpcEnv.setupEndpointRef( RpcAddress(host, port), YarnSchedulerBackend.ENDPOINT_NAME) createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)

CreateAllocator creates the allocator, so let’s drill down into the code,

 allocator = client.createAllocator(
      yarnConf,
      _sparkConf,
      appAttemptId,
      driverUrl,
      driverRef,
      securityMgr,
      localResources)

    // Initialize the AM endpoint *after* the allocator has been initialized. This ensures
    // that when the driver sends an initial executor request (e.g. after an AM restart),
    // the allocator is ready to service requests.
    rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef))

    allocator.allocateResources()

The code creates the allocator and gets the allocatable resources.

The thorough allocateResources,

val allocateResponse = amClient.allocate(progressIndicator)
val allocatedContainers = allocateResponse.getAllocatedContainers()

To get a container that can be allocated;

if (allocatedContainers.size > 0) {
      logDebug(("Allocated containers: %d. Current executor count: %d. " +
        "Launching executor count: %d. Cluster resources: %s.")
        .format(
          allocatedContainers.size,
          runningExecutors.size,
          numExecutorsStarting.get,
          allocateResponse.getAvailableResources))

      handleAllocatedContainers(allocatedContainers.asScala)
    }

If the size of the container is greater than 0, the container can be allocated. In-depth handleAllocatedContainers;

def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
    val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)

    // Match incoming requests by host
    val remainingAfterHostMatches = new ArrayBuffer[Container]
    for (allocatedContainer <- allocatedContainers) {
      matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,
        containersToUse, remainingAfterHostMatches)
    }

    // Match remaining by rack. Because YARN's RackResolver swallows thread interrupts
    // (see SPARK-27094), which can cause this code to miss interrupts from the AM, use
    // a separate thread to perform the operation.
    val remainingAfterRackMatches = new ArrayBuffer[Container]
    if (remainingAfterHostMatches.nonEmpty) {
      var exception: Option[Throwable] = None
      val thread = new Thread("spark-rack-resolver") {
        override def run(): Unit = {
          try {
            for (allocatedContainer <- remainingAfterHostMatches) {
              val rack = resolver.resolve(allocatedContainer.getNodeId.getHost)
              matchContainerToRequest(allocatedContainer, rack, containersToUse,
                remainingAfterRackMatches)
            }
          } catch {
            case e: Throwable =>
              exception = Some(e)
          }
        }
      }
      thread.setDaemon(true)
      thread.start()

      try {
        thread.join()
      } catch {
        case e: InterruptedException =>
          thread.interrupt()
          throw e
      }

      if (exception.isDefined) {
        throw exception.get
      }
    }

    // Assign remaining that are neither node-local nor rack-local
    val remainingAfterOffRackMatches = new ArrayBuffer[Container]
    for (allocatedContainer <- remainingAfterRackMatches) {
      matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse,
        remainingAfterOffRackMatches)
    }

    if (remainingAfterOffRackMatches.nonEmpty) {
      logDebug(s"Releasing ${remainingAfterOffRackMatches.size} unneeded containers that were " +
        s"allocated to us")
      for (container <- remainingAfterOffRackMatches) {
        internalReleaseContainer(container)
      }
    }

    runAllocatedContainers(containersToUse)

The code processes the assignable information according to the host information and the rack information. When you’re done, execute

runAllocatedContainers(containersToUse)

In-depth runAllocatedContainers

private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {
    for (container <- containersToUse) {
      executorIdCounter += 1
      val executorHostname = container.getNodeId.getHost
      val containerId = container.getId
      val executorId = executorIdCounter.toString
      assert(container.getResource.getMemory >= resource.getMemory)
      logInfo(s"Launching container $containerId on host $executorHostname " +
        s"for executor with ID $executorId")

      def updateInternalState(): Unit = synchronized {
        runningExecutors.add(executorId)
        numExecutorsStarting.decrementAndGet()
        executorIdToContainer(executorId) = container
        containerIdToExecutorId(container.getId) = executorId

        val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
          new HashSet[ContainerId])
        containerSet += containerId
        allocatedContainerToHostMap.put(containerId, executorHostname)
      }

      if (runningExecutors.size() < targetNumExecutors) {
        numExecutorsStarting.incrementAndGet()
        if (launchContainers) {
          launcherPool.execute(() => {
            try {
              new ExecutorRunnable(
                Some(container),
                conf,
                sparkConf,
                driverUrl,
                executorId,
                executorHostname,
                executorMemory,
                executorCores,
                appAttemptId.getApplicationId.toString,
                securityMgr,
                localResources,
                ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID // use until fully supported
              ).run()

The code iterates through the available containers to determine if the running container is smaller than the target container size. If less, the container is created. The launcherPool is the thread pool. The thread pool starts execute, and we click on the run method.

def run(): Unit = {
    logDebug("Starting Executor Container")
    nmClient = NMClient.createNMClient()
    nmClient.init(conf)
    nmClient.start()
    startContainer()
  }

The run creates nmClient, initializes it, establishes a connection, and finally starts the container. In-depth startContainer;

val commands = prepareCommand() ctx.setCommands(commands.asJava) // Send the start request to the ContainerManager try {  nmClient.startContainer(container.get, ctx) } catch { case ex: Exception => throw new SparkException(s"Exception while starting container ${container.get.getId}" + s" on host $hostname", ex) }

First prepare the commands, the context, and then have an NM launch the Container; In-depth prepareCommand (),

val commands = prefixEnv ++
      Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
      javaOpts ++
      Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend",
        "--driver-url", masterAddress,
        "--executor-id", executorId,
        "--hostname", hostname,
        "--cores", executorCores.toString,
        "--app-id", appId,
        "--resourceProfileId", resourceProfileId.toString) ++
      userClassPath ++
      Seq(
        s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
        s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")

YarnCoarseGrainedExecutorBackend code to start a process, when it comes to start the Executor. =, is to start “org. Apache. Spark. Executor. YarnCoarseGrainedExecutorBackend”;

The thorough YarnCoarseGrainedExecutorBackend,

def main(args: Array[String]): Unit = {
    val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv, ResourceProfile) =>
      CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
      new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
        arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env,
        arguments.resourcesFileOpt, resourceProfile)
    }
    val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args,
      this.getClass.getCanonicalName.stripSuffix("$"))
    CoarseGrainedExecutorBackend.run(backendArgs, createFn)
    System.exit(0)
  }

Create YarnCoarseGrainedExecutorBackend object, then execute the run method;

 var driver: RpcEndpointRef = null
      val nTries = 3
      for (i <- 0 until nTries if driver == null) {
        try {
          driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)
        } catch {
          case e: Throwable => if (i == nTries - 1) {
            throw e
          }
        }
      }

The run method connects the driver internally,

 val driverConf = new SparkConf()
      for ((key, value) <- props) {
        // this is required for SSL in standalone mode
        if (SparkConf.isExecutorStartupConf(key)) {
          driverConf.setIfMissing(key, value)
        } else {
          driverConf.set(key, value)
        }
      }

      cfg.hadoopDelegationCreds.foreach { tokens =>
        SparkHadoopUtil.get.addDelegationTokens(tokens, driverConf)
      }

      driverConf.set(EXECUTOR_ID, arguments.executorId)
      val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
        arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)

The code creates the execution environment env,

env.rpcEnv.setupEndpoint("Executor",
        backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile))
      arguments.workerUrl.foreach { url =>
        env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
      }
      env.rpcEnv.awaitTermination()

The code means to install a communication terminal in the entire environment.



The thorough setupEndpoint,

override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
    dispatcher.registerRpcEndpoint(name, endpoint)
  }

This is where the RPC’s communication terminal is registered, into the RegisterRPC Endpoint,

def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = { val addr = RpcEndpointAddress(nettyEnv.address, name) val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv) synchronized { if (stopped) { throw new IllegalStateException("RpcEnv has been stopped") } if (endpoints.containsKey(name)) { throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name") } //  This must be done before assigning RpcEndpoint to MessageLoop, as MessageLoop sets Inbox be // active when registering, and endpointRef must be put into endpointRefs before onStart is // called. endpointRefs.put(endpoint, endpointRef) var messageLoop: MessageLoop = null try { messageLoop = endpoint match { case e: IsolatedRpcEndpoint => new DedicatedMessageLoop(name, e, this) case _ => sharedLoop.register(name, endpoint) sharedLoop } endpoints.put(name, messageLoop) } catch { case NonFatal(e) => endpointRefs.remove(endpoint) throw e } } endpointRef

The code would define a message loop to perform pattern matching, which would create adedicated Messageloop (name, e, this) objects.

The thorough DedicatedMessageLoop,

private class DedicatedMessageLoop(
    name: String,
    endpoint: IsolatedRpcEndpoint,
    dispatcher: Dispatcher)
  extends MessageLoop(dispatcher) {

  private val inbox = new Inbox(name, endpoint)

  override protected val threadpool = if (endpoint.threadCount() > 1) {
    ThreadUtils.newDaemonCachedThreadPool(s"dispatcher-$name", endpoint.threadCount())
  } else {
    ThreadUtils.newDaemonSingleThreadExecutor(s"dispatcher-$name")
  }

There’s the creation of the Inbox and the ThreadPool,



Inbox means inbox,

private[netty] class Inbox(val endpointName: String, val endpoint: RpcEndpoint) extends Logging { inbox => // Give this an alias so we can use it more clearly in closures. @GuardedBy("this") protected val messages = new java.util.LinkedList[InboxMessage]() /** True if the inbox (and its associated endpoint) is stopped. */ @GuardedBy("this") private var stopped = false /** Allow multiple threads to process  messages at the same time. */ @GuardedBy("this") private var enableConcurrent = false /** The number of threads processing messages for this inbox. */ @GuardedBy("this") private var numActiveThreads = 0 // OnStart should be the first message to process inbox.synchronized { messages.add(OnStart) }

The communication environment has a life cycle:

constructor -> onStart -> receive* -> onStop

Our current node has an inbox, and the inbox sends itself a message, called onStart. After the message is sent, CoarseGrainedExecutorBackend will receive a message, will perform the following OnStart code,

override def onStart(): Unit = {
    logInfo("Connecting to driver: " + driverUrl)
    try {
      _resources = parseOrFindResources(resourcesFileOpt)
    } catch {
      case NonFatal(e) =>
        exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
    }
    rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      driver = Some(ref)
      ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
        extractAttributes, _resources, resourceProfile.id))
    }(ThreadUtils.sameThread).onComplete {
      case Success(_) =>
        self.send(RegisteredExecutor)
      case Failure(e) =>
        exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
    }(ThreadUtils.sameThread)
  }

The code connects to the driver and sends a request (ask) to register an Executor (registerExecutor)(register with the AppMaster). The registered executor is received by the applicationMaster’s Driver SparkContext.

SparkContext has a schedulerBackend property,

private var _schedulerBackend: SchedulerBackend = _
private[spark] trait SchedulerBackend { private val appId = "spark-application-" + System.currentTimeMillis def start():  Unit def stop(): Unit def reviveOffers(): Unit def defaultParallelism(): Int

Open the CoarseGrainedSchedulerBackend, this is a communication terminal.

OnStart (),

override def onStart(): Unit = {
      // Periodically revive offers to allow delay scheduling to work
      val reviveIntervalMs = conf.get(SCHEDULER_REVIVE_INTERVAL).getOrElse(1000L)

      reviveThread.scheduleAtFixedRate(() => Utils.tryLogNonFatalError {
        Option(self).foreach(_.send(ReviveOffers))
      }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
    }

The receive (),

override def receive: PartialFunction[Any, Unit] = { case StatusUpdate(executorId, taskId, state, data, resources) => scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { executorDataMap.get(executorId) match { case Some(executorInfo) => executorInfo.freeCores += scheduler.CPUS_PER_TASK resources.foreach { case (k, v) => executorInfo.resourcesInfo.get(k).foreach { r => r.release(v.addresses) } } makeOffers(executorId) case None => //  Ignoring the update since we don't know about the executor. logWarning(s"Ignored task status update ($taskId state $state) " + s"from unknown executor with ID $executorId") } } case ReviveOffers => makeOffers() ......

ReceiveAndReply,

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {

      case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls,
          attributes, resources, resourceProfileId) =>
        if (executorDataMap.contains(executorId)) {
          context.sendFailure(new IllegalStateException(s"Duplicate executor ID: $executorId"))
        } else if (scheduler.nodeBlacklist.contains(hostname) ||
            isBlacklisted(executorId, hostname)) {
          // If the cluster manager gives us an executor on a blacklisted node (because it
          // already started allocating those resources before we informed it of our blacklist,
          // or if it ignored our blacklist), then we reject that executor immediately.
          logInfo(s"Rejecting $executorId as it has been blacklisted.")
          context.sendFailure(new IllegalStateException(s"Executor is blacklisted: $executorId"))
        } else {
......

The request is received at the Case RegisterExecutor of the code.

logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId") addressToExecutorId(executorAddress)  = executorId totalCoreCount.addAndGet(cores) totalRegisteredExecutors.addAndGet(1) val resourcesInfo = resources.map{ case (k, v) => (v.name, new ExecutorResourceInfo(v.name, v.addresses, // tell the executor it can schedule resources up to numParts times, // as configured by the user, or set to 1 as that is the default (1 task/resource) taskResourceNumParts.getOrElse(v.name, 1)))

Upon receipt of the message, an increment is made in the current environment, one to the TotalRegisteredExecutors;

 listenerBus.post(
            SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
          // Note: some tests expect the reply to come after we put the executor in the map
          context.reply(true)
        }

This returns the message True, successfully registered; After successful in CoarseGrainedExecutorBackend, send your message said registration is successful,


 }(ThreadUtils.sameThread).onComplete {
      case Success(_) =>
        self.send(RegisteredExecutor)
      case Failure(e) =>
        exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
    }(ThreadUtils.sameThread)
  }

Send yourself a message “self.send(registeredExecutor)”, and you’ll get it,

override def receive: PartialFunction[Any, Unit] = {
    case RegisteredExecutor =>
      logInfo("Successfully registered with driver")
      try {
        executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,
          resources = _resources)
        driver.get.send(LaunchedExecutor(executorId))
      } catch {
        case NonFatal(e) =>
          exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
      }

This object is the actual Executor

executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,resources = _resources)

After successful launch of the driver, the message “LaunchedExecutor” is sent.

driver.get.send(LaunchedExecutor(executorId))

CoarseGrainedSchedulerBackend will receive,

 case LaunchedExecutor(executorId) =>
        executorDataMap.get(executorId).foreach { data =>
          data.freeCores = data.totalCores
        }
        makeOffers(executorId)



So here we are.