Spark source code parsing Yarn deployment process (SparkSubmit)

For more articles, you can open wechat search to know the alley public number and pay attention to the public number background reply information two words, there are big data learning video materials for free.

1. Yarn Deployment Process (SparkSubmit)

1.1 spark – submit a script

Spark-submit: spark/bin/spark-submit

# $@: All arguments passed to the script
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
Copy the code

spark-class

build_command() {
  "$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
  printf "%d\0" $?
}
Copy the code

Source location: spark the launcher/SRC/main/Java/org/apache/spark/the launcher/main Java spark application startup command line interface, used to spark the built-in scripting.

/** * Command line interface for the Spark launcher. Used internally by Spark scripts. */
class Main {
	// Usage: Main [class] [class args]
	// This CLI works in two different modes
	// submit if "spark-submit" : That is, the class is org. Apache. Spark. Deploy. SparkSubmit, then SparkLauncher class is 2 launch a spark application.
	// 2. If "spark-class" is provided, an internal spark class is run.
	// This class works in tandem with the "bin/spark-class" script on Unix-like systems
	public static void main(String[] argsArray) throws Exception {
    	checkArgument(argsArray.length > 0."Not enough arguments: missing class name.");
    	// SparkSubmit
    	if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
			try {
				AbstractCommandBuilder builder = new SparkSubmitCommandBuilder(args);
				cmd = buildCommand(builder, env, printLaunchCommand);
			} catch (IllegalArgumentException e) {
				// ...
				MainClassOptionParser parser = new MainClassOptionParser();
				try {
				  parser.parse(args);
				} catch (Exception ignored) {
				  // Ignore parsing exceptions.
				}

				// ...
				help.add(parser.USAGE_ERROR);
				AbstractCommandBuilder builder = new SparkSubmitCommandBuilder(help);
				// CMD Builder is differentcmd = buildCommand(builder, env, printLaunchCommand); }}else {
			AbstractCommandBuilder builder = new SparkClassCommandBuilder(className, args);
			// CMD Builder is different
			cmd = buildCommand(builder, env, printLaunchCommand);
		}
		// ...
	}
	// ...		

}
Copy the code

View the Main method of SparkSubmit: Object SparkSubmit extends CommandLineUtils with Logging

override def main(args: Array[String) :Unit = {
    val submit = new SparkSubmit() {
      self =>
      // Encapsulate configuration parameters
      override protected def parseArguments(args: Array[String) :SparkSubmitArguments = {
        // Create SparkSubmitArguments objects
        new SparkSubmitArguments(args) {
          // ...}}// ...

      override def doSubmit(args: Array[String) :Unit = {
        try {
          // Will be transferred here
          super.doSubmit(args)
        } catch {
          case e: SparkUserAppException =>
            exitFn(e.exitCode)
        }
      }

    }
    / / call doSubmit
    submit.doSubmit(args)
}
Copy the code
1.1.1 Encapsulating New SparkSubmitArguments

Above you can see created SparkSubmitArguments object: spark/core/SRC/main/scala/org/apache/spark/deploy/SparkSubmitArguments scala

/** * Parses and encapsulates arguments from the spark-submit script. * The env argument is used for testing. */
private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String.String] = sys.env)
  extends SparkSubmitArgumentsParser with Logging {
  / /...
  // Set parameters from command line arguments
  // Parses the arguments passed from the command line
  parse(args.asJava)
  / /...
}  
Copy the code

The implement in the parse: spark the launcher/SRC/main/Java/org/apache/spark/the launcher/SparkSubmitOptionParser Java

/**
 * Parse a list of spark-submit command line options.
 * <p>
 * See SparkSubmitArguments.scala for a more formal description of available options.
 *
 * @throws IllegalArgumentException If an error is found during parsing.
 */
protected final void parse(List<String> args) {
  Pattern eqSeparatedOpt = Pattern.compile("(-- [^ =] +) = (. +)");

  int idx = 0;
  // The outer layer is a for loop that iterates through all arG arguments
  for (idx = 0; idx < args.size(); idx++) {
    String arg = args.get(idx);
    String value = null;

    Matcher m = eqSeparatedOpt.matcher(arg);
    if (m.matches()) {
      arg = m.group(1);
      value = m.group(2);
    }

    // Look for options with a value.
    String name = findCliOption(arg, opts);
    if(name ! =null) {
      if (value == null) {
        if (idx == args.size() - 1) {
          throw new IllegalArgumentException(
              String.format("Missing argument for option '%s'.", arg));
        }
        idx++;
        value = args.get(idx);
      }
      // Parses the command line input
      if(! handle(name, value)) {break;
      }
      continue;
    }

    // Look for a switch.
    name = findCliOption(arg, switches);
    if(name ! =null) {
      // Parses the command line input
      if(! handle(name,null)) {
        break;
      }
      continue;
    }

    if(! handleUnknown(arg)) {break; }}if (idx < args.size()) {
    idx++;
  }
  handleExtraArgs(args.subList(idx, args.size()));
}
Copy the code

The Handle method does pattern matching and then assigns: /Users/shaozhipeng/Development/project/java/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scal A Here is an implementation of a template method

/** Fill in values by parsing user options. */
override protected def handle(opt: String, value: String): Boolean = {
  // Pattern matching and assignment
  opt match {
    case NAME =>
      name = value

    case MASTER =>
      master = value

    case CLASS =>
      mainClass = value

    case DEPLOY_MODE =>
      if(value ! ="client"&& value ! ="cluster") {
        error("--deploy-mode must be either \"client\" or \"cluster\"")
      }
      deployMode = value

    // ...

    case _ =>
      error(s"Unexpected argument '$opt'.") } action ! = SparkSubmitAction.PRINT_VERSION }Copy the code
1.1.2 Executing the doSubmit method

super.doSubmit(args)

/** * Main gateway of launching a Spark application. * * This program handles setting up the classpath with relevant Spark dependencies and provides * a layer over the different cluster managers and deploy modes that Spark supports. */
private[spark] class SparkSubmit extends Logging {
  import DependencyUtils. _import SparkSubmit. _def doSubmit(args: Array[String) :Unit = {
    // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
    // be reset before the application starts.
    val uninitLog = initializeLogIfNecessary(true, silent = true)

    val appArgs = parseArguments(args)
    if (appArgs.verbose) {
      logInfo(appArgs.toString)
    }
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
      case SparkSubmitAction.PRINT_VERSION => printVersion()
    }
  }
  // ...
}
Copy the code

Execute the Submit method, execute the doRunMain method and call the runMain method

/** * Submit the application using the provided parameters, ensuring to first wrap * in a doAs when --proxy-user is specified. */
@tailrec
private def submit(args: SparkSubmitArguments, uninitLog: Boolean) :Unit = {
  // doRunMain declares a method that will be called later. And it's going to call runMain anyway, right
  def doRunMain() :Unit = {
    if(args.proxyUser ! =null) {
      val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
        UserGroupInformation.getCurrentUser())
      try {
        proxyUser.doAs(new PrivilegedExceptionAction[Unit] () {override def run() :Unit = {
            runMain(args, uninitLog)
          }
        })
      } catch {
        case e: Exception= >// Hadoop's AuthorizationException suppresses the exception's stack trace, which
          // makes the message printed to the output by the JVM not very helpful. Instead,
          // detect exceptions with empty stack traces here, and treat them differently.
          if (e.getStackTrace().length == 0) {
            error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")}else {
            throw e
          }
      }
    } else {
      runMain(args, uninitLog)
    }
  }
  
  // In standalone cluster mode, there are two submission gateways:
  // (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
  // (2) The new Rest-based gateway introduced in Spark 1.3
  // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
  // to use the legacy gateway if the master endpoint turns out to be not a REST server.
  if (args.isStandaloneCluster && args.useRest) {
    try {
      logInfo("Running Spark using the REST application submission protocol.")
      doRunMain()
    } catch {
      // Fail over to use the legacy submission gateway
      case e: SubmitRestConnectionException =>
        logWarning(s"Master endpoint ${args.master} was not a REST server. " +
          "Falling back to legacy submission gateway instead.")
        args.useRest = false
        submit(args, false)}// In all other modes, just run the main class as prepared
  } else {
    doRunMain()
  }
}
Copy the code

The runMain method has two steps, as in the comment:

  1. First, we prepare the launch environment by setting up the appropriate classpath, system properties, and application arguments for running the child main class based on the cluster manager and the deploy mode.
  2. Second, we use this launch environment to invoke the main method of the child main class.
/** * Run the main method of the child class using the submit arguments. * * This runs in two steps. First, we prepare the launch environment by setting up * the appropriate classpath, system properties, and application arguments for * running the child main class based on the cluster manager and the deploy mode. * Second,  we use this launch environment to invoke the main method of the child * main class. * * Note that this main class will not be the one provided by the user if we're * running cluster deploy mode or python applications. */
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean) :Unit = {
  // 1. Prepare the submission environment
  val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
  // ...
  try {
    // 2. Get the class by reflection
    // childMainClass is the class we typed on the command line.
    / / org. Apache. Spark. Deploy. Yarn. YarnClusterApplication "cluster pattern"
    mainClass = Utils.classForName(childMainClass)
  } catch {
    // ...
  }
  
  val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
    // Yarn-cluster actually goes here
    mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]}else {
    // call the main method
    new JavaMainApplication(mainClass)
  }
  // ...
  try {
  	  // 3. Call the start method of SparkApplication
      app.start(childArgs.toArray, sparkConf)
  } catch {
 	  case t: Throwable= >throw findCause(t)
  }
}
Copy the code

Prepare to submit the prepareSubmitEnvironment

/** * Prepare the environment for submitting an application. * * @param args the parsed SparkSubmitArguments used for environment preparation. * @param conf the Hadoop Configuration, this argument will only be set in unit test. * @return a 4-tuple: * (1) the arguments for the child process, * (2) a list of classpath entries for the child, * (3) a map of system properties, and * (4) the main class for the child * * Exposed for testing. */
private[deploy] def prepareSubmitEnvironment(
      args: SparkSubmitArguments,
      conf: Option[HadoopConfiguration] = None)
      : (Seq[String].Seq[String].SparkConf.String) = {
    // Return the value, focusing on childMainClass
    val childArgs = new ArrayBuffer[String] ()val childClasspath = new ArrayBuffer[String] ()val sparkConf = args.toSparkConf()
    // More important
    var childMainClass = ""
    // ...
    
Copy the code

Return value childMainClass, different modes to execute in main class: client mode:

// In client mode, launch the application main class directly
// In addition, add the main application jar and any added jars (if any) to the classpath
if (deployMode == CLIENT) {
  childMainClass = args.mainClass
  if(localPrimaryResource ! =null && isUserJar(localPrimaryResource)) {
    childClasspath += localPrimaryResource
  }
  if(localJars ! =null) { childClasspath ++= localJars.split(",")}}Copy the code

Cluster pattern: org. Apache. Spark. Deploy. Yarn. YarnClusterApplication

// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
  // private[deploy] val YARN_CLUSTER_SUBMIT_CLASS = "org.apache.spark.deploy.yarn.YarnClusterApplication"
  childMainClass = YARN_CLUSTER_SUBMIT_CLASS
  // ...
}
Copy the code

The JavaMainApplication is implemented by loading the class through reflection and executing the main method. RunMain# utils.classforname (childMainClass) finds and calls the main method:

private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication {
  // Override the start method to find and call klass's main method
  override def start(args: Array[String], conf: SparkConf) :Unit = {
    // Find the main method
    val mainMethod = klass.getMethod("main".new Array[String] (0).getClass)
    // Check whether it is static
    if (!Modifier.isStatic(mainMethod.getModifiers)) {
      throw new IllegalStateException("The main method in the given main class must be static")}val sysProps = conf.getAll.toMap
    sysProps.foreach { case (k, v) =>
      sys.props(k) = v
    }
    // Call main
    mainMethod.invoke(null, args)
  }

}
Copy the code

YarnClusterApplication does not have a main method, so it overwrites the start method of SparkApplication and creates a new Client to execute the run method.

1.2 org. Apache. Spark. Deploy. Yarn. YarnClusterApplication

Spark/resource – managers/yarn/SRC/main/scala/org/apache/spark/deploy/yarn/Client. The scala “figure”

// Inheriting SparkApplication overrides the start method
private[spark] class YarnClusterApplication extends SparkApplication {

  override def start(args: Array[String], conf: SparkConf) :Unit = {
    // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
    // so remove them from sparkConf here for yarn mode.
    conf.remove(JARS)
    conf.remove(FILES)

    // 1. Encapsulate the configuration parameter new ClientArguments(args)
    // 2. Create a new Client
    // 3. Run.run()
    new Client(new ClientArguments(args), conf, null).run()
  }

}
Copy the code
1.2.1 Encapsulating Configuration Parameters New ClientArguments(ARGS)

Just some encapsulation configuration parameters.

/** * Command-line parser for the driver client. */
private[deploy] class ClientArguments(args: Array[String]) {
  import ClientArguments. _var cmd: String = "" // 'launch' or 'kill'
  // ...
}
Copy the code
1.2.2 Creating a New Client

Objects have a yarnClient = yarnClient. CreateYarnClient

private[spark] class Client(
    val args: ClientArguments,
    val sparkConf: SparkConf,
    val rpcEnv: RpcEnv)
  extends Logging {

  import Client. _import YarnSparkHadoopUtil. _/ / create YarnClient
  private val yarnClient = YarnClient.createYarnClient
  // ...
}  
Copy the code

A simple static factory org. Apache. Hadoop. Yarn. Client. Api# YarnClient

@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class YarnClient extends AbstractService {

  /** * Create a new instance of YarnClient. */
  @Public
  public static YarnClient createYarnClient(a) {
  	// YarnClient client implementation
    YarnClient client = new YarnClientImpl();
    return client;
  }
  // ...
}
Copy the code

Org, apache hadoop. Yarn. Client. API. Impl# YarnClientImpl 【 】 key object is rmClient for reference.

@Private
@Unstable
public class YarnClientImpl extends YarnClient {

  private static final Log LOG = LogFactory.getLog(YarnClientImpl.class);

  // ResourceManager client
  protected ApplicationClientProtocol rmClient;
  // ...
}
Copy the code
1.2.3 Calling client.run

Register Spark to ResourceManager.

/** * Submit an application to the ResourceManager. * If set spark.yarn.submit.waitAppCompletion to true, it will stay alive * reporting the application's status until the application has exited for any reason. * Otherwise, the client process will exit after submission. * If the application finishes with a failed, killed, or undefined status, * throw an appropriate SparkException. */
def run() :Unit = {
	// Apply ID - Submit the application
	this.appId = submitApplication()
	// ...
}
Copy the code

Submit the Spark application submitApplication() to run the ApplicationMaster corresponding to the current Spark application.

/** * Submit an application running our ApplicationMaster to the ResourceManager. * * The stable Yarn API provides a convenience method (YarnClient#createApplication) for * creating applications and setting up the application submission context. This was not * available in the alpha API. */
def submitApplication() :ApplicationId = {
	ResourceRequestHelper.validateResources(sparkConf)

	var appId: ApplicationId = null
	try {
	  Initialize and start the YARN client
	  launcherBackend.connect()
	  // Initialize the YARN client
	  yarnClient.init(hadoopConf)
	  // Start the YARN client
	  yarnClient.start()

	  logInfo("Requesting a new application from cluster with %d NodeManagers"
	    .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))

	  // Get a new application from our RM
	  // 2. Request RM to create a YARN application and obtain the appId
	  val newApp = yarnClient.createApplication()
	  val newAppResponse = newApp.getNewApplicationResponse()
	  appId = newAppResponse.getApplicationId()

	  // ...

	  // Set up the appropriate contexts to launch our AM
	  // 3. Create context for ApplicationMaster
	  val containerContext = createContainerLaunchContext(newAppResponse)
	  val appContext = createApplicationSubmissionContext(newApp, containerContext)

	  // Finally, submit and monitor the application
	  // 4. Submit and monitor the application running status
	  logInfo(s"Submitting application $appId to ResourceManager")
	  yarnClient.submitApplication(appContext)
	  launcherBackend.setAppId(appId.toString)
	  reportLauncherState(SparkAppHandle.State.SUBMITTED)
	  / / returns the appId
	  appId
	} catch {
	  case e: Throwable= >if(stagingDirPath ! =null) {
	      cleanupStagingDir()
	    }
	    throw e
	}
}
Copy the code

Create a container context containerContext = createContainerLaunchContext (newAppResponse)

/** * Set up a ContainerLaunchContext to launch our ApplicationMaster container. * This sets up the launch environment, java options, and the command for launching the AM. */
private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
: ContainerLaunchContext = {
	 // JVM parameters, etc
	 // ...
    // Here is the key - ApplicationMaster class
    val amClass =
      if (isClusterMode) {
        Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
      } else {
        Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
      }

    // ...

    // ApplicationMaster starts parameter concatenation
	val amArgs =
      Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++
      Seq("--properties-file",
        buildPath(Environment.PWD. $$(),LOCALIZED_CONF_DIR.SPARK_CONF_FILE+ +))Seq("--dist-cache-conf",
        buildPath(Environment.PWD. $$(),LOCALIZED_CONF_DIR.DIST_CACHE_CONF_FILE))

	// Start AM Command for the ApplicationMaster
    val commands = prefixEnv ++
      Seq(Environment.JAVA_HOME. $$() +"/bin/java"."-server") ++
      javaOpts ++ amArgs ++
      Seq(
        "1 >".ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"."2 >".ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")

    // TODO: it would be nicer to just make sure there are no null commands here
    val printableCommands = commands.map(s => if (s == null) "null" else s).toList
    amContainer.setCommands(printableCommands.asJava)
    
    // ...  
    // send the acl settings into YARN to control who has access via YARN interfaces
    val securityManager = new SecurityManager(sparkConf)
    amContainer.setApplicationACLs(
      YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager).asJava)
    setupSecurityToken(amContainer)
    / / return amContainer
    amContainer
}
Copy the code

ContainerLaunchContext is an abstract class containing ContainerId, Resource, User, Security Tokens, LocalResource, Environment Variables, commands to start containers, and more.

/ * * * {@code ContainerLaunchContext} represents all of the information
 * needed by the {@codeNodeManager} to launch a container. * <p> * It includes details such as: * <ul> * <li>{ContainerId} of the container.</li> * <li>{Resource} allocated to the container.</li> * <li>User to whom the container is allocated.</li> * <li>Security tokens (if security is enabled).</li> * <li> * {LocalResource} necessary  for running the container such * as binaries, jar, shared-objects, side-files etc. * </li> * <li>Optional, application-specific binary service data.</li> * <li>Environment variables for the launched process.</li> * <li>Command to launch the container.</li> * </ul> * *@see ContainerManagementProtocol#startContainers(org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest)
 */
@Public
@Stable
public abstract class ContainerLaunchContext { / /... }
Copy the code