To learn how Spark works internally and how to execute Spark tasks from start to finish, start by submitting the spark-submit shell script to the user program. The following analysis is based on Spark 2.1.1.

When submitting a Spark task, we usually write the following script, which specifies the location of the spark-submit script, sets some parameters, and runs the script:

./bin/spark-submit \ --class <main-class> \ --master <master-url> \ --deploy-mode <deploy-mode> \ --conf <key>=<value> \ . # other options <application-jar> \ [application-arguments]Copy the code

The above script actually takes the parameters to the spark-submit script for execution. Take a look at the spark-submit script:

if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi  # disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
Copy the code

Script last call exec execution “${SPARK_HOME}”/bin/spark – class, call the class is: org. Apache. Spark. Deploy. SparkSubmit, script execution is “$@” behind all the parameters.

Through the spark – class script, eventually executed command, set the program’s entrance is org. Apache. Spark. Deploy. SparkSubmit.

A, org. Apache. Spark. Deploy. SparkSubmit

1. Main method

def main(args: Array[String]): Unit = {
    val appArgs = new SparkSubmitArguments(args)
    if (appArgs.verbose) {
      // scalastyle:off println
      printStream.println(appArgs)
      // scalastyle:on println
    }
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    }
  }
Copy the code

As can be seen from the main method, according to the parsed parameters in the action pattern matching, what operation is to execute the method, our side is submit operation, then call submit method.

2. Submit method

The Submit method does two things. The first thing is to use the clusterManager and DployMode to determine which class’s main method to execute next. The second thing is to execute the main method based on reflection.

2.1. Submit method the first step

This part is mainly to prepare the relevant classes and parameters to be executed in the next step:

private def submit(args: SparkSubmitArguments): Unit = {
    val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
Copy the code
2.1.1, prepareSubmitEnvironment method

Prepare the main method and related parameters of the class to be executed next by calling the prepareSubmitEnvironment method. The following section sets the corresponding cluasterManager and deployment mode according to the master and deploy-mode parameters:

private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments)
      : (Seq[String], Seq[String], Map[String, String], String) = {
    // Four arguments to return
    val childArgs = new ArrayBuffer[String]()
    val childClasspath = new ArrayBuffer[String]()
    val sysProps = new HashMap[String, String]()
    var childMainClass = ""

    // The clusterManager pattern is matched according to the master parameter configured in the script
    val clusterManager: Int = args.master match {
      case "yarn" => YARN
      case "yarn-client" | "yarn-cluster" =>
        printWarning(s"Master ${args. Master} is deprecated since 2.0." +
          " Please use master \"yarn\" with specified deploy mode instead.")
        YARN
      case m if m.startsWith("spark") => STANDALONE
      case m if m.startsWith("mesos") => MESOS
      case m if m.startsWith("local") => LOCAL
      case _ =>
        printErrorAndExit("Master must either be yarn or start with spark, mesos, local")
        -1
    }

    // Use the deployMode parameter to remove patterns from deployment patterns
    var deployMode: Int = args.deployMode match {
      case "client" | null => CLIENT
      case "cluster" => CLUSTER
      case _ => printErrorAndExit("Deploy mode must be either client or cluster"); -1
    }
Copy the code

The standalone cluster deployment mode shows the standalone cluster deployment mode. The standalone cluster deployment mode shows the standalone cluster deployment mode.

		// The standalone Cluster mode childMainClass and the configuration of its parameters
    if (args.isStandaloneCluster) {
      // If useRest is specified, submit the Application in RestSubmissionClient mode
      if (args.useRest) {
        childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
        childArgs += (args.primaryResource, args.mainClass)
      } else {
        // Otherwise use Client to submit the application
        childMainClass = "org.apache.spark.deploy.Client"
        if (args.supervise) { childArgs += "--supervise" }
        Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
        Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
        childArgs += "launch"
        childArgs += (args.master, args.primaryResource, args.mainClass)
      }
      if(args.childArgs ! =null) {
        childArgs ++= args.childArgs
      }
    }
Copy the code

In standalone cluster mode, there are two submission gateways:

1, the use of org. Apache. Spark. Deploy. The Client as the wrapper to use traditional RPC gateway;

2. Use the REST-based gateway introduced in Spark 1.3.

2.2. The second step of submit method

Here we have our parameters ready, and then decide what to do based on our standalone Cluster deployment mode:

	In standalone cluster mode, there are two submission gateways: * 1. Using org. Apache. Spark. Deploy. The Client as a wrapper to use traditional RPC gateway * 2. The spark 1.3 introduced in rest-based gateway * the second approach is to spark the default behavior of 1.3, But Spark Submit will fail * If the master is not a REST server, it will not be able to use the REST gateway. * /
		if (args.isStandaloneCluster && args.useRest) {
      try {
        // scalastyle:off println
        printStream.println("Running Spark using the REST application submission protocol.")
        // scalastyle:on println
        doRunMain()
      } catch {
        // Fail over to use the legacy submission gateway
        case e: SubmitRestConnectionException =>
          printWarning(s"Master endpoint ${args.master} was not a REST server. " +
            "Falling back to legacy submission gateway instead.")
          args.useRest = false
          submit(args)}}else {
      // In other modes, call doRunMain directly
      doRunMain()
    }
Copy the code

And then we call doRunMain, and we actually call runMain internally, so let’s just look at runMain.

2.2.1, the runMain method
// This method is actually executed by reflection based on the parameters prepared by our prepareSubmitEnvironment method above
// Class and method to execute next
private def runMain( childArgs: Seq[String], childClasspath: Seq[String], sysProps: Map[String, String], childMainClass: String, verbose: Boolean): Unit = {
    // scalastyle:off println
    if (verbose) {
      printStream.println(s"Main class:\n$childMainClass")
      printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
      printStream.println(s"System properties:\n${sysProps.mkString("\n")}")
      printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
      printStream.println("\n")}// scalastyle:on println

    val loader =
      if (sysProps.getOrElse("spark.driver.userClassPathFirst"."false").toBoolean) {
        new ChildFirstURLClassLoader(new Array[URL](0),
          Thread.currentThread.getContextClassLoader)
      } else {
        new MutableURLClassLoader(new Array[URL](0),
          Thread.currentThread.getContextClassLoader)
      }
    Thread.currentThread.setContextClassLoader(loader)

    for (jar <- childClasspath) {
      addJarToClasspath(jar, loader)
    }

    for ((key, value) <- sysProps) {
      System.setProperty(key, value)
    }

    var mainClass: Class[_] = null

    try {
      mainClass = Utils.classForName(childMainClass)
    } catch {
      case e: ClassNotFoundException =>
        e.printStackTrace(printStream)
        if (childMainClass.contains("thriftserver")) {
          // scalastyle:off println
          printStream.println(s"Failed to load main class $childMainClass.")
          printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
          // scalastyle:on println
        }
        System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
      case e: NoClassDefFoundError =>
        e.printStackTrace(printStream)
        if (e.getMessage.contains("org/apache/hadoop/hive")) {
          // scalastyle:off println
          printStream.println(s"Failed to load hive class.")
          printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
          // scalastyle:on println
        }
        System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
    }

    // SPARK-4170
    if (classOf[scala.App].isAssignableFrom(mainClass)) {
      printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
    }

    val mainMethod = mainClass.getMethod("main".new Array[String](0).getClass)
    if(! Modifier.isStatic(mainMethod.getModifiers)) {throw new IllegalStateException("The main method in the given main class must be static")}@tailrec
    def findCause(t: Throwable): Throwable = t match {
      case e: UndeclaredThrowableException =>
        if(e.getCause() ! =null) findCause(e.getCause()) else e
      case e: InvocationTargetException =>
        if(e.getCause() ! =null) findCause(e.getCause()) else e
      case e: Throwable =>
        e
    }

    try {
      // Execute the prepared mainClass main method by reflection
      mainMethod.invoke(null, childArgs.toArray)
    } catch {
      case t: Throwable =>
        findCause(t) match {
          case SparkUserAppException(exitCode) =>
            System.exit(exitCode)

          case t: Throwable =>
            throw t
        }
    }
  }
Copy the code

We chose the standalone Cluster model to analyze, According to the above prepareSubmitEnvironment method can know we are going to use org. Apache. Spark. Deploy. This childMainClass Client, then according to the code above to know, The next step is to our related parameters into the org. Apache. Spark. Deploy. The Client to perform in the main method of this class.

So began to see below org. Apache. Spark. Deploy. The Client

Second, org. Apache. Spark. Deploy. The Client

The Client is used to start and terminate the Driver program in the standalone cluster.

1. Main method

def main(args: Array[String]) {
    // scalastyle:off println
    if(! sys.props.contains("SPARK_SUBMIT")) {
      println("WARNING: This client is deprecated and will be removed in a future version of Spark")
      println("Use ./bin/spark-submit with \"--master spark://host:port\"")}// scalastyle:on println

    val conf = new SparkConf()
    val driverArgs = new ClientArguments(args)

    if(! conf.contains("spark.rpc.askTimeout")) {
      conf.set("spark.rpc.askTimeout"."10s")
    }
    Logger.getRootLogger.setLevel(driverArgs.logLevel)

  / / create rpcEnv
    val rpcEnv =
      RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))

  // Obtain the RpcEndPoint reference of the master node for Rpc communication with the master
    val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
      map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
  
  // Register rpcEndpoint and call onStart
    rpcEnv.setupEndpoint("client".new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))

  //
    rpcEnv.awaitTermination()
  }
Copy the code

RpcEnv: rpcEnv: rpcEnv: rpcEnv: rpcEnv: rpcEnv: rpcEnv In Spark, obtain the reference to rpcEndpoint of the master to register rpcEndpoint, which calls the onstart method of ClientEndpoint.

Three, org. Apache. Spark. Deploy. ClientEndpoint

ClientEndPoint is a ThreadSafeRpcEndpoint, and let’s look at its onStart method.

1, onStart method

override def onStart(a): Unit = {
    driverArgs.cmd match {
      case "launch"= >// TODO: We could add an env variable here and intercept it in `sc.addJar` that would
        // truncate filesystem paths similar to what YARN does. For now, we just require
        // people call `addJar` assuming the jar is in the same directory.
        val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"

        val classPathConf = "spark.driver.extraClassPath"
        val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>
          cp.split(java.io.File.pathSeparator)
        }

        val libraryPathConf = "spark.driver.extraLibraryPath"val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp => cp.split(java.io.File.pathSeparator) } val  extraJavaOptsConf ="spark.driver.extraJavaOptions"
        val extraJavaOpts = sys.props.get(extraJavaOptsConf)
          .map(Utils.splitCommandString).getOrElse(Seq.empty)
        val sparkJavaOpts = Utils.sparkJavaOpts(conf)
        val javaOpts = sparkJavaOpts ++ extraJavaOpts
      
      	/ / will classPathEntries libraryPathEntries javaOpts, drvierArgs information encapsulated into the Command
        / / the mainClass here is org. Apache. Spark. Deploy. Worker. DriverWrapper
        val command = new Command(mainClass,
          Seq("{{WORKER_URL}}"."{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
          sys.env, classPathEntries, libraryPathEntries, javaOpts)
				
      	// Encapsulate drvierArgs, command information into DriverDescription
        val driverDescription = new DriverDescription(
          driverArgs.jarUrl,
          driverArgs.memory,
          driverArgs.cores,
          driverArgs.supervise,
          command)
      
      	// Send a RequestSubmitDriver to the master to register the Driver
        ayncSendToMasterAndForwardReply[SubmitDriverResponse](
          RequestSubmitDriver(driverDescription))

      case "kill" =>
        val driverId = driverArgs.driverId
        ayncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
    }
  }
Copy the code

If launch is the command, get the driver’s additional Java dependencies, classpath, and Java configuration. Then the information encapsulated as a Command object, the parameters of the driver to drop and the Command information together into DriverDescription object, call ayncSendToMasterAndForwardReply send information.

2, ayncSendToMasterAndForwardReply method

private def ayncSendToMasterAndForwardReply[T: ClassTag](message: Any): Unit = {
    for (masterEndpoint <- masterEndpoints) {
      masterEndpoint.ask[T](message).onComplete {
        case Success(v) => self.send(v)
        case Failure(e) =>
          logWarning(s"Error sending messages to master $masterEndpoint", e)
      }(forwardMessageExecutionContext)
    }
  }
Copy the code

This method actually sends the information to masterEndpoint.

Four,

At this point, the spark-Submit task submission is complete, and the next step is to wait for the master to return the driver registration result and start the driver.

Finally, take a look at the flowchart of the Spark-Submit process: