0 foreword

Let’s take a look at the spark submission script, which only has client mode and Cluster mode respectively. Let’s focus on cluster mode

spark-submit 
--master yarn \
--deploy-mode cluster \
--driver-memory 1g \
--executor-memory 2g \
--executor-cores 2 \
--class org.apache.spark.examples.SparkPi \
${SPARK_HOME}/examples/jars/spark-examples.jar \
Copy the code

`

1 spark-submit.sh Submit script

#! /usr/bin/env bash # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under The Apache License, Version 2.0 # (The "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the  License. # if [ -z "${SPARK_HOME}" ]; Then source "$(dirname "$0")"/find-spark-home FI # disable randomized hash for String in Python 3.3+ export PYTHONHASHSEED=0 exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"Copy the code

2 Parameter resolution Reflection calls are made based on parsed parameters

2.0 Parameter Analysis

The key is to use this class org. Apache. Spark. Deploy. SparkSubmit to parse SparkSubmitArguments used for parsing the incoming parameters

try {
  parse(args.asJava)
} catch {
  case e: IllegalArgumentException =>
    SparkSubmit.printErrorAndExit(e.getMessage())
}
// Populate `sparkProperties` map from properties file
mergeDefaultSparkProperties()
// Remove keys that don't start with "spark." from `sparkProperties`.
ignoreNonSparkProperties()
// Use `sparkProperties` map along with env vars to fill in any missing parameters
loadEnvironmentArguments()

validateArguments()
Copy the code

The submit call is made with the parsed parameters

override def main(args: Array[String]): Val appArgs = new SparkSubmitArguments(args) if (appargs.verbose) {// scalastyle:off println printStream.println(appArgs) // scalastyle:on println } appArgs.action match { // TODO Sparksubmitaction. submit => submit(appArgs) case SparkSubmitAction.KILL => KILL (appArgs) case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) } }Copy the code

You go into the submit method and you parse the environment variables in prepareSubmitEnvironment and the key is the four parameters that are returned

Val childArgs = new ArrayBuffer[String](

Val childClasspath = new ArrayBuffer[String](

Val sysProps = new HashMap[String, String]() Java system parameters

var childMainClass = ""

ChildMainClass is the most important method to use to submit to YARN

We focus on is cluser mode If can use this class cluster mode is the org. Apache. Spark. Deploy. Yarn. The Client in the subsequent reflection calls

/ / used in cluster mode Is the use of "org. Apache. Spark. Deploy. Yarn. The Client if this class (isYarnCluster) {childMainClass = "org.apache.spark.deploy.yarn.Client" if (args.isPython) { childArgs += ("--primary-py-file", args.primaryResource) childArgs += ("--class", "org.apache.spark.deploy.PythonRunner") } else if (args.isR) { val mainFile = new Path(args.primaryResource).getName childArgs += ("--primary-r-file", mainFile) childArgs += ("--class", "org.apache.spark.deploy.RRunner") } else { if (args.primaryResource ! = SparkLauncher.NO_RESOURCE) { childArgs += ("--jar", args.primaryResource) } childArgs += ("--class", args.mainClass) } if (args.childArgs ! = null) { args.childArgs.foreach { arg => childArgs += ("--arg", arg) } } }Copy the code
private def submit(args: SparkSubmitArguments): Val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args) def doRunMain(): Unit = { if (args.proxyUser ! = null) { val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser, UserGroupInformation.getCurrentUser()) try { proxyUser.doAs(new PrivilegedExceptionAction[Unit]() { override def run(): Unit = { runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) } }) }Copy the code

2.1 Enter the runMain method for reflection calls

We entered the runMain methods can be found in the code here We just mentioned above reflect way of org. Apache. Spark. Deploy. Yarn. The Client start the main method in this class

val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass) if (! Modifier.isStatic(mainMethod.getModifiers)) { throw new IllegalStateException("The main method in the given main class must be static") } @tailrec def findCause(t: Throwable): Throwable = t match { case e: UndeclaredThrowableException => if (e.getCause() ! = null) findCause(e.getCause()) else e case e: InvocationTargetException => if (e.getCause() ! = null) findCause(e.getCause()) else e case e: Throwable => e } try { mainMethod.invoke(null, childArgs.toArray) } catch { case t: Throwable => findCause(t) match { case SparkUserAppException(exitCode) => System.exit(exitCode) case t: Throwable => throw t } }Copy the code