Spark stage retry causes the stage to fail to finish normally. It is still waiting

Online Spark version, 2.4.1

The task has been killed by the user

https://github.com/apache/spa…

SparkUI phenomenon

Stage TAB page

Driver log log

Analysis of Phenomena 1: Before the task is killed by the user, stage16.1 in the stage TAB page is stuck at the task progress of 577/685, while stage16.1 in the driver log managed by the TaskSetManager is 685/685. And then it stops

By positioning logging code, TaskSetManager handleSuccessfulTask, 557/685 of the value of 557 is tasksSuccessful

if (! successful(index)) { tasksSuccessful += 1 logInfo(s"Finished task ${info.id} in stage ${taskSet.id} (TID ${info.taskId})  in" + s" ${info.duration} ms on ${info.host} (executor ${info.executorId})" + s" ($tasksSuccessful/$numTasks)") // Mark  successful and stop if all the tasks have succeeded. successful(index) = true if (tasksSuccessful == numTasks) { isZombie = true } } else { logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + " because  task " + index + " has already completed successfully") }

TasksSuccessful lists are maintained by the TaskSetManager and are incremented in two ways, one the way above, One is that MarkPartitionCompleted marks the completed partition as successful when TaskSetManager is created, as shown below

private[scheduler] def createTaskSetManager(
    taskSet: TaskSet,
    maxTaskFailures: Int): TaskSetManager = {
  // only create a BitSet once for a certain stage since we only remove
  // that stage when an active TaskSetManager succeed.
  stageIdToFinishedPartitions.getOrElseUpdate(taskSet.stageId, new BitSet)
  val tsm = new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)
  // TaskSet got submitted by DAGScheduler may have some already completed
  // tasks since DAGScheduler does not always know all the tasks that have
  // been completed by other tasksets when completing a stage, so we mark
  // those tasks as finished here to avoid launching duplicate tasks, while
  // holding the TaskSchedulerImpl lock.
  // See SPARK-25250 and `markPartitionCompletedInAllTaskSets()`
  stageIdToFinishedPartitions.get(taskSet.stageId).foreach {
    finishedPartitions => finishedPartitions.foreach(tsm.markPartitionCompleted(_, None))
  }
  tsm
}

For stages that fail because of the FETCH, the retried stage multiplies the completion information of the old stage ID partitions, creates the TaskSetManager, and marks the completed partitions as successful

Analysis phenomenon 2: in the driver log, the starting value of completion task of stage 16.1 is 109/685

INFO scheduler.DAGScheduler: Resubmitting ShuffleMapStage 14 (run at ThreadPoolExecutor.java:1149) and ShuffleMapStage 16 (run at ThreadPoolExecutor.java:1149) due to fetch failure
The overall analysis

I took a brief look at the useful log information and started the overall analysis.

Question 1: Driver log has completed 685/685 progress, why not trigger the next stage? And in Stage UI is the progress of 577/685

Task completion CoarseGrainedExecutorBackend sends StatusUpdate task status change information, CoarseGrainedSchedulerBackend will get StatusUpdate change information, Then the corresponding StatusUpdate method is called to process the completed task asynchronously using the TaskResultGetter thread pool.

Asynchronous processing task thread will call TaskSchedulerImpl. HandleSuccessfulTask, again by TaskSetManager. HandleSuccessfulTask for processing, This method outputs the 685/685 log information in the driver, and then calls the TaskEnded method of the DagScheduler to send the CompletionEvent information.

The DagScheduler gets the CompletionEvent information and makes a judgment. Here is the core logic, as follows:

val shuffleStage = stage.asInstanceOf[ShuffleMapStage] shuffleStage.pendingPartitions -= task.partitionId val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) if (executorFailureEpoch.contains(execId) && smt.epoch <= executorFailureEpoch(execId)) { logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") } else { // The epoch of the task is acceptable (i.e., the task was launched after the most // recent failure we're aware of for the executor), so mark the task's output as // available. mapOutputTracker.registerMapOutput( shuffleStage.shuffleDep.shuffleId, smt.partitionId, status) } if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) { markStageAsFinished(shuffleStage) logInfo("looking for newly runnable stages") logInfo("running: " + runningStages) logInfo("waiting: " + waitingStages) logInfo("failed: " + failedStages) // This call to increment the epoch may not be strictly necessary, but it is retained // for now in order to minimize the changes in behavior from an earlier version of the // code. This existing behavior of always incrementing the epoch following any // successful shuffle map stage completion may have benefits by causing unneeded // cached map outputs to be cleaned up earlier on executors. In the future we can // consider removing this call, but this will require some extra investigation. // See https://github.com/apache/spark/pull/17955/files#r117385673 for more details. mapOutputTracker.incrementEpoch() clearCacheLocs() if (! shuffleStage.isAvailable) { // Some tasks had failed; let's resubmit this shuffleStage. // TODO: Lower-level scheduler should also deal with this logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name + ") because some of its tasks had failed: " + shuffleStage.findMissingPartitions().mkString(", ")) submitStage(shuffleStage) } else { markMapStageJobsAsFinished(shuffleStage) submitWaitingChildStages(shuffleStage) } }

If the stage is running and the pendingPartitions are empty, then the logical branch of the submitStage or submitWaitingChildStages is entered. Otherwise, no processing is made (this is the reason why the tasks are always waiting, the pendingParitions are never empty). So the question now is pendingParitions.

According to the task construction and distribution of DagScheduler, the number of heavenly pendingParitions is the number of missing tasks, i.e. 685. The stage will call the MapOutputTrackerMaster’s findMissingReturns method call. The code is as follows:

/**
 * Returns the sequence of partition ids that are missing (i.e. needs to be computed).
 */
def findMissingPartitions(): Seq[Int] = synchronized {
  val missing = (0 until numPartitions).filter(id => mapStatuses(id) == null)
  assert(missing.size == numPartitions - _numAvailableOutputs,
    s"${missing.size} missing, expected ${numPartitions - _numAvailableOutputs}")
  missing
}

As you can see, this is related to the status of the MapStatuses partition, which filters out the NULL status of the partition

As can be seen from the above figure, the number of tasks completed at Stage 16 before retry is 423. In this case, there are a total of 1000 partitions, and 1000-423 = 577, which is not equal to the 685 to be run in Stage 16.1 retry. However, 685 were found in findMissingPartitions, which means that, under abnormal circumstances, the partition states in mapStatues may be cleared, such as executor lose, etc., and the map partition storage is missing. There is no need to keep the address information in map buzzings.

Note the message in the log:

21/06/26 01:28:54 INFO scheduler.DAGScheduler: Shuffle files lost for executor: 2 (epoch 6)
21/06/26 01:28:54 INFO scheduler.DAGScheduler: Resubmitting failed stages
if (fileLost && (! shuffleFileLostEpoch.contains(execId) || shuffleFileLostEpoch(execId) < currentEpoch)) { shuffleFileLostEpoch(execId) = currentEpoch hostToUnregisterOutputs match { case Some(host) => logInfo(s"Shuffle files lost for host: $host (epoch $currentEpoch)") mapOutputTracker.removeOutputsOnHost(host) case None => logInfo(s"Shuffle files lost for executor: $execId (epoch $currentEpoch)") mapOutputTracker.removeOutputsOnExecutor(execId) } }

The above code can be found, mapOutputTracker removeOutputsOnExecutor (execId), will be put on the stage in the executor 2 MapStatus information removed. The DagScheduler found 685 uncompleted partitions for the 423 completed tasks, which were 108 in Executor 2, plus 577 that were not completed, which was 685. The TaskSetManager is created and submitted to the TaskSetManager.

This means that 108 tasks have not been committed at all, so the pendingPartitions are waiting for the 108 tasks to complete.

The log is obviously 109/685. Start with the log. As mentioned above, 109 refers to the tasksSuccessful variable. The initialization code is as follows:

private[scheduler] def createTaskSetManager( taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { // only create a BitSet once for a certain stage since we only remove // that stage when an active TaskSetManager succeed. stageIdToFinishedPartitions.getOrElseUpdate(taskSet.stageId, new BitSet) val tsm = new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt) // TaskSet got submitted by DAGScheduler may have some already completed // tasks since DAGScheduler does not always know all the tasks that have // been completed by other tasksets when completing a stage, so we mark // those tasks as finished here to avoid launching duplicate tasks, while // holding the TaskSchedulerImpl lock. // See SPARK-25250 and `markPartitionCompletedInAllTaskSets()` stageIdToFinishedPartitions.get(taskSet.stageId).foreach { finishedPartitions => finishedPartitions.foreach(tsm.markPartitionCompleted(_, None)) } tsm } // private[scheduler] def markPartitionCompleted( partitionId: Int, taskInfo: Option[TaskInfo]): Unit = { partitionToIndex.get(partitionId).foreach { index => if (! successful(index)) { if (speculationEnabled && ! isZombie) { taskInfo.foreach { info => successfulTaskDurations.insert(info.duration) } } tasksSuccessful += 1 successful(index) = true if (tasksSuccessful == numTasks) { if (! isZombie) { sched.stageIdToFinishedPartitions -= stageId isZombie = true } } maybeFinishTaskSet() } } }

StageIdToFinishedPartitions is original stage 16 partition information, then the new TaskSetManager successful, successful, Here, Successful just determines whether the task is in a terminal state, not whether it succeeds or not. So, the 108 tasks successful are marked as true and taskSuccessful + 1, which is why the driver log starts with 109 after finishing the task.

When the TaskSchedulerImpl ResourceOffers fetches tasks, the successful partition is determined. At this point only 477 of the 685 missions will be assigned and 108 will not be assigned. Then the total number of pendingParitions in the stage will still be 108 after the completion of all the stage tasks retried, resulting in the current job cannot be suspended and waiting for the completion of other tasks.

The reason summary

By retrying the stage, the DAG marks Executor Lost tasks as tasks that need to be rerun when creating taskSets. But when you create the TaskSetManager. Reuse the finishPartions BitSet that was recorded in the stage. This does not remove executor lost, causing executor lost to be marked when partitions are successful. This part of the task cannot be committed, DAG has been waiting.

Theoretical solution

  1. If only the fetch data failed and the executor is alive, there is no need to re-run the map. It can be marked as successful, but the pendingPartitions will also be removed. You can’t just mark success. If executors are removed and an executor is added, you need to rerun
  2. Or you can leave it unmarked, and commit every missingTask submitted, regardless of the reason for the failure

Community solutions

On March 7, this symbol was added

On April 14th the submission was revert

Specific discussion can see https://github.com/apache/spa… , the second option is adopted

The solution

This is in the 2.4.x branch of Spark. It should be 2.4.2