0 overview

A Spark application consists of three concepts: Job, Stage, and Task: 1. A Job is bounded by an Action method. An Action method triggers a Job.

2 Stage is a subset of jobs, bounded by the RDD wide dependency (Shuffle). Shuffle is divided once.

3 Task is a subset of Stage. It is measured by parallelism (number of partitions). The number of partitions is the number of tasks. Spark Task scheduling is performed in two modes: stage-level and task-level.

Spark RDD uses the Transactions operation to form the RDD blood relationship (dependency) diagram, that is, DAG. After that, actions are invoked to trigger jobs and schedule Job execution. Two schedulers are created during the execution: DAGScheduler and TaskScheduler. DAGScheduler is responsible for stage-level scheduling. It mainly divides jobs into several Stages and packages each Stage into tasksets for the TaskScheduler to schedule. TaskScheduler is responsible for task-level scheduling. The TaskSet sent by DAGScheduler is sent to Executor based on the specified scheduling policy. During the scheduling process, SchedulerBackend provides available resources. SchedulerBackend has multiple implementations that connect to different resource management systems.

TaskScheduler is responsible for task-level scheduling. The TaskSet sent by DAGScheduler is sent to Executor based on the specified scheduling policy. During the scheduling process, SchedulerBackend provides available resources. SchedulerBackend has multiple implementations that connect to different resource management systems.

When the Driver initializes SparkContext, it initializes DAGScheduler, TaskScheduler, SchedulerBackend, and HeartbeatReceiver. SchedulerBackend and HeartbeatReceiver are started. SchedulerBackend applies for resources through ApplicationMaster and continually gets appropriate tasks from TaskScheduler to Executor for execution. The HeartbeatReceiver receives the Executor’s heartbeat, monitors the Executor’s health, and notifies the TaskScheduler. The main structure of SparkContext

private var _conf: SparkConf = _
private var _eventLogDir: Option[URI] = None
private var _eventLogCodec: Option[String] = None
private var _env: SparkEnv = _
private var _jobProgressListener: JobProgressListener = _
private var _statusTracker: SparkStatusTracker = _
private var _progressBar: Option[ConsoleProgressBar] = None
private var _ui: Option[SparkUI] = None
private var _hadoopConfiguration: Configuration = _
private var _executorMemory: Int = _
private var _schedulerBackend: SchedulerBackend = _
private var _taskScheduler: TaskScheduler = _
private var _heartbeatReceiver: RpcEndpointRef = _
@volatile private var _dagScheduler: DAGScheduler = _
private var _applicationId: String = _
private var _applicationAttemptId: Option[String] = None
private var _eventLogger: Option[EventLoggingListener] = None
private var _executorAllocationManager: Option[ExecutorAllocationManager] = None
private var _cleaner: Option[ContextCleaner] = None
private var _listenerBusStarted: Boolean = false
private var _jars: Seq[String] = _
private var _files: Seq[String] = _
private var _shutdownHookRef: AnyRef = _
Copy the code

Important parameters

SparkConf configuration object

SparkEnv Environment Object communication environment

SchedulerBackend Communication Background is used for communication between Excutors

TaskScheduler assigns tasks to the scheduler

Division of DAGScheduler phase scheduler

1 Division of RDD

public static void main(String[] args) throws Exception { if (args.length < 1) { System.err.println("Usage: JavaWordCount <file>"); System.exit(1); } SparkSession spark = SparkSession .builder() .appName("JavaWordCount") .getOrCreate(); JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD(); JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator()); JavaPairRDD<String, Integer> ones = words.mapToPair(s -> new Tuple2<>(s, 1)); JavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2); List<Tuple2<String, Integer>> output = counts.collect(); for (Tuple2<? ,? > tuple : output) { System.out.println(tuple._1() + ": " + tuple._2()); } spark.stop(); }Copy the code

Take WordCount as an example: If we go to the textFile source code, we can see that it actually calls the map function

def textFile(
    path: String,
    minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
  assertNotStopped()
  hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
    minPartitions).map(pair => pair._2.toString).setName(path)
}
Copy the code

I’m going to point to the map function

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
Copy the code

You can see that there’s actually a MapPartitionsRDD code in there so it returns a MapPartitionsRDD object and similarly if we click on the flatMap function we can see that it also returns a MapPartitionsRDD and let’s see MapPartitionsRDD is a function

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( var prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) preservesPartitioning: Boolean = false) extends RDD[U](prev) { override val partitioner = if (preservesPartitioning) firstParent[T].partitioner  else None override def getPartitions: Array[Partition] = firstParent[T].partitions override def compute(split: Partition, context: TaskContext): Iterator[U] = f(context, split.index, firstParent[T].iterator(split, context)) override def clearDependencies() { super.clearDependencies() prev = null } }Copy the code

We can see that it inherits an abstract RDD class and if you look inside it, you’ll see OneToOneDependency. So this function will show you the dependencies of the two RDD classes

@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
Copy the code

It can be found that this place is what we call narrow dependence

Let’s continue with the reduceByKey and go all the way inside

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
  combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}
Copy the code

It’s still calling the Aggregator method and we found out that it’s calling one

new ShuffledRDD[K, V, C](self, partitioner)
    .setSerializer(serializer)
    .setAggregator(aggregator)
    .setMapSideCombine(mapSideCombine)
}
Copy the code

We can continue to click in to find

override def getDependencies: Seq[Dependency[_]] = {
  val serializer = userSpecifiedSerializer.getOrElse {
    val serializerManager = SparkEnv.get.serializerManager
    if (mapSideCombine) {
      serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[C]])
    } else {
      serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[V]])
    }
  }
  List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
}
Copy the code

You can see ShuffleDependency where we talk about wide and narrow dependencies

Division of 2 stages

If we click Collect, we can see that the runJob function is called

def collect(): Array[T] = withScope {
  val results = sc.(this, (iter: Iterator[T]) => iter.toArray)
  Array.concat(results: _*)
}
Copy the code

I hit the runJob function all the way

def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    resultHandler: (Int, U) => Unit): Unit = {
  if (stopped.get()) {
    throw new IllegalStateException("SparkContext has been shutdown")
  }
  val callSite = getCallSite
  val cleanedFunc = clean(func)
  logInfo("Starting job: " + callSite.shortForm)
  if (conf.getBoolean("spark.logLineage", false)) {
    logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
  }
  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
  progressBar.foreach(_.finishAll())
  rdd.doCheckpoint()
}
Copy the code

DagScheduler. RunJob (RDD, cleanedFunc, Partitions, callSite, resultHandler, Localproperties.get) is what we call the DAG scheduler

Let’s click inside

def runJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): Unit = {
  val start = System.nanoTime
  val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
  ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
  waiter.completionFuture.value.get match {
    case scala.util.Success(_) =>
      logInfo("Job %d finished: %s, took %f s".format
        (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
    case scala.util.Failure(exception) =>
      logInfo("Job %d failed: %s, took %f s".format
        (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
      val callerStackTrace = Thread.currentThread().getStackTrace.tail
      exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
      throw exception
  }
}
Copy the code

You can see the function submitJob

def submitJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): JobWaiter[U] = { // Check to make sure we are not launching a task on a partition that does not exist. val maxPartitions  = rdd.partitions.length partitions.find(p => p >= maxPartitions || p < 0).foreach { p => throw new IllegalArgumentException( "Attempting to access a non-existent partition: " + p + ". " + "Total number of partitions: " + maxPartitions) } val jobId = nextJobId.getAndIncrement() if (partitions.size == 0) { // Return immediately if the job is running 0 tasks return new JobWaiter[U](this, jobId, 0, resultHandler) } assert(partitions.size > 0) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties))) waiter }Copy the code

In this

eventProcessLoop.post(JobSubmitted(
  jobId, rdd, func2, partitions.toArray, callSite, waiter,
  SerializationUtils.clone(properties)))
Copy the code

The key is this post that we click on to see

def post(event: E): Unit = {
  eventQueue.put(event)
}
Copy the code

I’m putting a queue in an eventQueue and we’re going to focus on the eventQueue, and you can see here what I’m going to focus on is pulling the event out of the queue and calling onReceive

private val eventThread = new Thread(name) { setDaemon(true) override def run(): Unit = { try { while (! stopped.get) { val event = eventQueue.take() try { onReceive(event) } catch { case NonFatal(e) => try { onError(e) } catch { case NonFatal(e) => logError("Unexpected error in " + name, e) } } } } catch { case ie: InterruptedException => // exit even if eventQueue is not empty case NonFatal(e) => logError("Unexpected error in " + name, e) } } }Copy the code

If we click on the onReceive method, we can see that the method doOnReceive is called. Then we continue to see that there is a pattern match, based on the JobSubmitted method that we just matched

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
  case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
    dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

  case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
    dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)

  case StageCancelled(stageId, reason) =>
    dagScheduler.handleStageCancellation(stageId, reason)

  case JobCancelled(jobId, reason) =>
    dagScheduler.handleJobCancellation(jobId, reason)

  case JobGroupCancelled(groupId) =>
    dagScheduler.handleJobGroupCancelled(groupId)

  case AllJobsCancelled =>
    dagScheduler.doCancelAllJobs()

  case ExecutorAdded(execId, host) =>
    dagScheduler.handleExecutorAdded(execId, host)

  case ExecutorLost(execId, reason) =>
    val filesLost = reason match {
      case SlaveLost(_, true) => true
      case _ => false
    }
    dagScheduler.handleExecutorLost(execId, filesLost)

  case BeginEvent(task, taskInfo) =>
    dagScheduler.handleBeginEvent(task, taskInfo)

  case GettingResultEvent(taskInfo) =>
    dagScheduler.handleGetTaskResult(taskInfo)

  case completion: CompletionEvent =>
    dagScheduler.handleTaskCompletion(completion)

  case TaskSetFailed(taskSet, reason, exception) =>
    dagScheduler.handleTaskSetFailed(taskSet, reason, exception)

  case ResubmitFailedStages =>
    dagScheduler.resubmitFailedStages()
}
Copy the code

Let’s go to handleJobSubmitted, where the core method is createResultStage

private def createResultStage(
    rdd: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    jobId: Int,
    callSite: CallSite): ResultStage = {
  val parents = getOrCreateParentStages(rdd, jobId)
  val id = nextStageId.getAndIncrement()
  val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
  stageIdToStage(id) = stage
  updateJobIdStageIdMaps(jobId, stage)
  stage
}
Copy the code

Click on this area and you can see that it’s called “stages” and there’s an important way to do it: GetorCreate Stages core code

private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
  getShuffleDependencies(rdd).map { shuffleDep =>
    getOrCreateShuffleMapStage(shuffleDep, firstJobId)
  }.toList
}
Copy the code

Check getShuffleDependencies for dependencies of the next layer

private[scheduler] def getShuffleDependencies( rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = { val parents = new HashSet[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] val waitingForVisit = new Stack[RDD[_]] waitingForVisit.push(rdd) while (waitingForVisit.nonEmpty) { val toVisit = waitingForVisit.pop() if (! visited(toVisit)) { visited += toVisit toVisit.dependencies.foreach { case shuffleDep: ShuffleDependency[_, _, _] => parents += shuffleDep case dependency => waitingForVisit.push(dependency.rdd) } } } parents }Copy the code

We see important method: getOrCreateShuffleMapStage this method is to create ShuffleMapStage is known. So we just saw the createResultStage and what we end up with is a ResultStage and the ResultStage contains the ShuffleMapStage as shown here

2.0 summarize

Spark’s task scheduling starts with DAG cutting and is mainly done by DAGScheduler. When an Action is encountered, it will trigger the calculation of a Job and hand it to DAGScheduler for submission. The following is the flow chart of method invocation related to Job submission

To delete (cut)

  1. The Job is encapsulated by the final RDD and Action methods;

  2. SparkContext submits the Job to DAGScheduler, which splits a Job into several Stages based on the DAG composed of RDD blood relationship. The specific division strategy is as follows: The final RDD continuously determines whether the parent dependency is a wide dependency through dependency backtracking, that is, stages are divided with Shuffle as the boundary. RDD with narrow dependence is divided into the same Stage, and pipelin-type calculation can be carried out. There are two types of Stages. One is called a ResultStage, which is the most downstream Stage of a DAG and is determined by the Action method. The other is called a ShuffleMapStage, which prepares data for the downstream Stages

3 Division of tasks

We can create a submitStage after completing the ResultStage in the code above

finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)

...

submitStage(finalStage)

Copy the code

GetMissingParentStages is a method that determines whether or not a stage has an upper set. If there is an upper stage, recursively submit the upper stage. Otherwise submitMissingTasks(stage, jobid.get) submit the tasks for this stage

private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") if (! waitingStages(stage) && ! runningStages(stage) && ! failedStages(stage)) { val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) if (missing.isEmpty) { logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") submitMissingTasks(stage, jobId.get) } else { for (parent <- missing) { submitStage(parent) } waitingStages += stage } } } else { abortStage(stage, "No active job for stage " + stage.id, None) } }Copy the code

Point to thesubmitStageIn this one you can see that the key uses a pattern matching to create different stages and there’s a key point herepartitionsToComputeMap is called so let’s go topartitionsToComputeIt’s called insidestage.findMissingPartitions()This method has two implementation classes,Because all you get back is a list that contains ids soEach stage determines the number of tasks according to the number of partitions in the best RDD

val tasks: Seq[Task[_]] = try { val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array() stage  match { case stage: ShuffleMapStage => stage.pendingPartitions.clear() partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id) stage.pendingPartitions += id new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } case stage: ResultStage => partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = stage.rdd.partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, id, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } } }Copy the code

We find that there is a taskScheduler to submitTasks

taskScheduler.submitTasks(new TaskSet(
  tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
Copy the code

Let’s look at the implementation class for this method

override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { val manager = createTaskSetManager(taskSet, maxTaskFailures) val stage = taskSet.stageId val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) stageTaskSets(taskSet.stageAttemptId) = manager val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => ts.taskSet ! = taskSet && ! ts.isZombie } if (conflictingTaskSet) { throw new IllegalStateException(s"more than one active taskSet for stage $stage:" + s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}") } schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (! isLocal && ! hasReceivedTask) { starvationTimer.scheduleAtFixedRate(new TimerTask() { override def run() { if (! hasLaunchedTask) { logWarning("Initial job has not accepted any resources; " + "check your cluster UI to ensure that workers are registered " + "and have sufficient resources") } else { this.cancel() } } }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS) } hasReceivedTask = true } backend.reviveOffers() }Copy the code

Let’s go further in and we have one

val manager = createTaskSetManager(taskSet, maxTaskFailures)
Copy the code

And then we came to

schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
Copy the code

The key point comes from the constructor of the task scheduler that we can use in

def initialize(backend: SchedulerBackend) { this.backend = backend schedulableBuilder = { schedulingMode match { case SchedulingMode.FIFO => new  FIFOSchedulableBuilder(rootPool) case SchedulingMode.FAIR => new FairSchedulableBuilder(rootPool, conf) case _ => throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " + s"$schedulingMode") } } schedulableBuilder.buildPools() }Copy the code

There are two key methods FIFOSchedulableBuilder and FairSchedulableBuilder

Go to Fib SchedulableBuilder first

private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
  extends SchedulableBuilder with Logging {

  override def buildPools() {
    // nothing
  }

  override def addTaskSetManager(manager: Schedulable, properties: Properties) {
    rootPool.addSchedulable(manager)
  }
}
Copy the code

You can see there’s a rootPool thing and I looked it up and it looks something like this

Let’s move on

backend.reviveOffers()
Copy the code

This is an abstract method

Let’s go to cluster mode:

override def reviveOffers() {
  driverEndpoint.send(ReviveOffers)
}
Copy the code

Keep looking ReviveOffers

There is a

case ReviveOffers =>
  makeOffers()
Copy the code

This makeOffer is used to get the description of the taskDescs task

// Make fake resource offers on all executors private def makeOffers() { // Make sure no executor is killed while some task is launching on it val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized { // Filter out executors under killing val activeExecutors = executorDataMap.filterKeys(executorIsAlive) val workOffers = activeExecutors.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores) }.toIndexedSeq scheduler.resourceOffers(workOffers) } if (! taskDescs.isEmpty) { launchTasks(taskDescs) } }Copy the code

Finally, use launchTasks to submit methods to Executor

private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
  for (task <- tasks.flatten) {
    val serializedTask = TaskDescription.encode(task)
    if (serializedTask.limit >= maxRpcMessageSize) {
      scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
        try {
          var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
            "spark.rpc.message.maxSize (%d bytes). Consider increasing " +
            "spark.rpc.message.maxSize or using broadcast variables for large values."
          msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)
          taskSetMgr.abort(msg)
        } catch {
          case e: Exception => logError("Exception in error callback", e)
        }
      }
    }
    else {
      val executorData = executorDataMap(task.executorId)
      executorData.freeCores -= scheduler.CPUS_PER_TASK

      logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
        s"${executorData.executorHost}.")

      executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
    }
  }
}
Copy the code

3.0 summarize

The Spark Task is scheduled by the TaskScheduler. DAGScheduler packages the Stage to the TaskScheTaskSetduler. The TaskScheduler encapsulates the TaskSet as TaskSetManager and adds it to the scheduling queue. The TaskSetManager structure is shown below. The TaskSetManager is responsible for monitoring and managing Tasks in the same Stage. The TaskScheduler uses the TaskSetManager as a unit to schedule Tasks. As mentioned earlier, After TaskScheduler is initialized, SchedulerBackend starts. It deals with the outside world, receives Executor registration information, and maintains Executor status. SchedulerBackend takes care of “food” and periodically asks the TaskScheduler if there are tasks to run after it is started. In other words, it periodically asks the TaskScheduler “DO you want some extra food?” When The TaskScheduler “asks” it in SchedulerBackend, it selects TaskSetManager from the scheduling queue according to the specified scheduling policy. The general method call flow is as follows:

In the figure above, after adding TaskSetManager to the rootPool scheduling pool, riviveOffers method of SchedulerBackend is called to send a ReviveOffer message to driverEndpoint. DriverEndpoint calls the makeOffers method after receiving the ReviveOffer cancellation to filter out the active executors (which are all executors registered with the Driver at task start time). Then encapsulate the Executor as a WorkerOffer object; Once the workerOffers are ready, the taskScheduler calls the resourceOffer to assign tasks to executors based on those resources.