DAGSchedulerEventProcessLoop — DAGScheduler Event Bus

DAGSchedulerEventProcessLoop (dag-scheduler-event-loop) is an EventLoop single "business logic" thread for processing DAGSchedulerEvent events.

Note
The purpose of the DAGSchedulerEventProcessLoop is to have a separate thread to process events asynchronously and serially, i.e. one by one, and let DAGScheduler do its work on the main thread.
Table 1. DAGSchedulerEvents and Event Handlers (in alphabetical order)
DAGSchedulerEvent Event Handler Trigger

AllJobsCancelled

DAGScheduler was requested to cancel all running or waiting jobs.

BeginEvent

handleBeginEvent

TaskSetManager informs DAGScheduler that a task is starting (through taskStarted).

CompletionEvent

handleTaskCompletion

Posted to inform DAGScheduler that a task has completed (successfully or not).

CompletionEvent conveys the following information:

1. Completed Task instance (as task)

2. TaskEndReason (as reason)

3. Result of the task (as result)

4. Accumulator updates

5. TaskInfo

ExecutorAdded

handleExecutorAdded

DAGScheduler was informed (through executorAdded) that an executor was spun up on a host.

ExecutorLost

handleExecutorLost

Posted to notify DAGScheduler that an executor was lost.

ExecutorLost conveys the following information:

1. execId

2. ExecutorLossReason

NOTE: The input filesLost for handleExecutorLost is enabled when ExecutorLossReason is SlaveLost with workerLost enabled (it is disabled by default).

NOTE: handleExecutorLost is also called when DAGScheduler is informed that a task has failed due to FetchFailed exception.

GettingResultEvent

TaskSetManager informs DAGScheduler (through taskGettingResult) that a task has completed and results are being fetched remotely.

JobCancelled

handleJobCancellation

DAGScheduler was requested to cancel a job.

JobGroupCancelled

handleJobGroupCancelled

DAGScheduler was requested to cancel a job group.

JobSubmitted

handleJobSubmitted

Posted when DAGScheduler is requested to submit a job or run an approximate job.

JobSubmitted conveys the following information:

1. A job identifier (as jobId)

2. A RDD (as finalRDD)

3. The function to execute (as func: (TaskContext, Iterator[_]) ⇒ _)

4. The partitions to compute (as partitions)

5. A CallSite (as callSite)

6. The JobListener to inform about the status of the stage.

7. Properties of the execution

MapStageSubmitted

handleMapStageSubmitted

Posted to inform DAGScheduler that SparkContext submitted a MapStage for execution (through submitMapStage).

MapStageSubmitted conveys the following information:

1. A job identifier (as jobId)

2. The ShuffleDependency

3. A CallSite (as callSite)

4. The JobListener to inform about the status of the stage.

5. Properties of the execution

ResubmitFailedStages

resubmitFailedStages

DAGScheduler was informed that a task has failed due to FetchFailed exception.

StageCancelled

handleStageCancellation

DAGScheduler was requested to cancel a stage.

TaskSetFailed

handleTaskSetFailed

DAGScheduler was requested to cancel a TaskSet

When created, DAGSchedulerEventProcessLoop gets the reference to the owning DAGScheduler that it uses to call event handler methods on.

Note
DAGSchedulerEventProcessLoop uses java.util.concurrent.LinkedBlockingDeque blocking deque that grows indefinitely, i.e. up to Integer.MAX_VALUE events.

AllJobsCancelled Event and…​

Caution
FIXME

GettingResultEvent Event and handleGetTaskResult Handler

GettingResultEvent(taskInfo: TaskInfo) extends DAGSchedulerEvent

GettingResultEvent is a DAGSchedulerEvent that triggers handleGetTaskResult (on a separate thread).

Note
GettingResultEvent is posted to inform DAGScheduler (through taskGettingResult) that a task fetches results.

handleGetTaskResult Handler

handleGetTaskResult(taskInfo: TaskInfo): Unit

handleGetTaskResult merely posts SparkListenerTaskGettingResult (to LiveListenerBus Event Bus).

BeginEvent Event and handleBeginEvent Handler

BeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent

BeginEvent is a DAGSchedulerEvent that triggers handleBeginEvent (on a separate thread).

Note
BeginEvent is posted to inform DAGScheduler (through taskStarted) that a TaskSetManager starts a task.

handleBeginEvent Handler

handleBeginEvent(task: Task[_], taskInfo: TaskInfo): Unit

handleBeginEvent looks the stage of task up in stageIdToStage internal registry to compute the last attempt id (or -1 if not available) and posts SparkListenerTaskStart (to listenerBus event bus).

JobGroupCancelled Event and handleJobGroupCancelled Handler

JobGroupCancelled(groupId: String) extends DAGSchedulerEvent

JobGroupCancelled is a DAGSchedulerEvent that triggers handleJobGroupCancelled (on a separate thread).

Note
JobGroupCancelled is posted when DAGScheduler is informed (through cancelJobGroup) that SparkContext was requested to cancel a job group.

handleJobGroupCancelled Handler

handleJobGroupCancelled(groupId: String): Unit

handleJobGroupCancelled finds active jobs in a group and cancels them.

Internally, handleJobGroupCancelled computes all the active jobs (registered in the internal collection of active jobs) that have spark.jobGroup.id scheduling property set to groupId.

handleJobGroupCancelled then cancels every active job in the group one by one and the cancellation reason: "part of cancelled job group [groupId]".

Getting Notified that ShuffleDependency Was Submitted — handleMapStageSubmitted Handler

handleMapStageSubmitted(
  jobId: Int,
  dependency: ShuffleDependency[_, _, _],
  callSite: CallSite,
  listener: JobListener,
  properties: Properties): Unit
scheduler handlemapstagesubmitted.png
Figure 1. MapStageSubmitted Event Handling

handleMapStageSubmitted finds or creates a new ShuffleMapStage for the input ShuffleDependency and jobId.

handleMapStageSubmitted creates an ActiveJob (with the input jobId, callSite, listener and properties, and the ShuffleMapStage).

Caution
FIXME Why is this clearing here so important?

You should see the following INFO messages in the logs:

INFO DAGScheduler: Got map stage job [id] ([callSite]) with [number] output partitions
INFO DAGScheduler: Final stage: [stage] ([name])
INFO DAGScheduler: Parents of final stage: [parents]
INFO DAGScheduler: Missing parents: [missingStages]

handleMapStageSubmitted registers the new job in jobIdToActiveJob and activeJobs internal registries, and with the final ShuffleMapStage.

Note
ShuffleMapStage can have multiple ActiveJobs registered.

Ultimately, handleMapStageSubmitted posts SparkListenerJobStart message to LiveListenerBus and submits the ShuffleMapStage.

In case the ShuffleMapStage could be available already, handleMapStageSubmitted marks the job finished.

Note
DAGScheduler requests MapOutputTrackerMaster for statistics for ShuffleDependency that it uses for handleMapStageSubmitted.
Note
MapOutputTrackerMaster is passed in when DAGScheduler is created.

When handleMapStageSubmitted could not find or create a ShuffleMapStage, you should see the following WARN message in the logs.

WARN Creating new stage failed due to exception - job: [id]

handleMapStageSubmitted notifies listener about the job failure and exits.

Note
MapStageSubmitted event processing is very similar to JobSubmitted events.
Tip

The difference between handleMapStageSubmitted and handleJobSubmitted:

  • handleMapStageSubmitted has a ShuffleDependency among the input parameters while handleJobSubmitted has finalRDD, func, and partitions.

  • handleMapStageSubmitted initializes finalStage as getShuffleMapStage(dependency, jobId) while handleJobSubmitted as finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)

  • handleMapStageSubmitted INFO logs Got map stage job %s (%s) with %d output partitions with dependency.rdd.partitions.length while handleJobSubmitted does Got job %s (%s) with %d output partitions with partitions.length.

  • FIXME: Could the above be cut to ActiveJob.numPartitions?

  • handleMapStageSubmitted adds a new job with finalStage.addActiveJob(job) while handleJobSubmitted sets with finalStage.setActiveJob(job).

  • handleMapStageSubmitted checks if the final stage has already finished, tells the listener and removes it using the code:

    if (finalStage.isAvailable) {
      markMapStageJobAsFinished(job, mapOutputTracker.getStatistics(dependency))
    }

TaskSetFailed Event and handleTaskSetFailed Handler

TaskSetFailed(
  taskSet: TaskSet,
  reason: String,
  exception: Option[Throwable])
extends DAGSchedulerEvent

TaskSetFailed is a DAGSchedulerEvent that triggers handleTaskSetFailed method.

Note
TaskSetFailed is posted when DAGScheduler is requested to cancel a TaskSet.

handleTaskSetFailed Handler

handleTaskSetFailed(
  taskSet: TaskSet,
  reason: String,
  exception: Option[Throwable]): Unit

handleTaskSetFailed looks the stage (of the input taskSet) up in the internal stageIdToStage registry and aborts it.

ResubmitFailedStages Event and resubmitFailedStages Handler

ResubmitFailedStages extends DAGSchedulerEvent

ResubmitFailedStages is a DAGSchedulerEvent that triggers resubmitFailedStages method.

Note
ResubmitFailedStages is posted for FetchFailed case in handleTaskCompletion.

resubmitFailedStages Handler

resubmitFailedStages(): Unit

resubmitFailedStages iterates over the internal collection of failed stages and submits them.

Note
resubmitFailedStages does nothing when there are no failed stages reported.

You should see the following INFO message in the logs:

INFO Resubmitting failed stages

resubmitFailedStages clears the internal cache of RDD partition locations first. It then makes a copy of the collection of failed stages so DAGScheduler can track failed stages afresh.

Note
At this point DAGScheduler has no failed stages reported.

The previously-reported failed stages are sorted by the corresponding job ids in incremental order and resubmitted.

Getting Notified that Executor Is Lost — handleExecutorLost Handler

handleExecutorLost(
  execId: String,
  filesLost: Boolean,
  maybeEpoch: Option[Long] = None): Unit

handleExecutorLost checks whether the input optional maybeEpoch is defined and if not requests the current epoch from MapOutputTrackerMaster.

Note
MapOutputTrackerMaster is passed in (as mapOutputTracker) when DAGScheduler is created.
Caution
FIXME When is maybeEpoch passed in?
dagscheduler handleExecutorLost.png
Figure 2. DAGScheduler.handleExecutorLost

Recurring ExecutorLost events lead to the following repeating DEBUG message in the logs:

DEBUG Additional executor lost message for [execId] (epoch [currentEpoch])
Note
handleExecutorLost handler uses DAGScheduler's failedEpoch and FIXME internal registries.

Otherwise, when the executor execId is not in the list of executor lost or the executor failure’s epoch is smaller than the input maybeEpoch, the executor’s lost event is recorded in failedEpoch internal registry.

Caution
FIXME Describe the case above in simpler non-technical words. Perhaps change the order, too.

You should see the following INFO message in the logs:

INFO Executor lost: [execId] (epoch [epoch])
Caution
FIXME Review what’s filesLost.

handleExecutorLost exits unless the ExecutorLost event was for a map output fetch operation (and the input filesLost is true) or external shuffle service is not used.

In such a case, you should see the following INFO message in the logs:

INFO Shuffle files lost for executor: [execId] (epoch [epoch])

handleExecutorLost walks over all ShuffleMapStages in DAGScheduler’s shuffleToMapStage internal registry and do the following (in order):

JobCancelled Event and handleJobCancellation Handler

JobCancelled(jobId: Int) extends DAGSchedulerEvent

JobCancelled is a DAGSchedulerEvent that triggers handleJobCancellation method (on a separate thread).

Note
JobCancelled is posted when DAGScheduler is requested to cancel a job.

handleJobCancellation Handler

handleJobCancellation(jobId: Int, reason: String = "")

handleJobCancellation first makes sure that the input jobId has been registered earlier (using jobIdToStageIds internal registry).

If the input jobId is not known to DAGScheduler, you should see the following DEBUG message in the logs:

DEBUG DAGScheduler: Trying to cancel unregistered job [jobId]

Otherwise, handleJobCancellation fails the active job and all independent stages (by looking up the active job using jobIdToActiveJob) with failure reason:

Job [jobId] cancelled [reason]

Getting Notified That Task Has Finished — handleTaskCompletion Handler

handleTaskCompletion(event: CompletionEvent): Unit
dagscheduler tasksetmanager.png
Figure 3. DAGScheduler and CompletionEvent
Note
CompletionEvent holds contextual information about the completed task.
Table 2. CompletionEvent Properties
Property Description

task

Completed Task instance for a stage, partition and stage attempt.

reason

TaskEndReason…​FIXME

result

Result of the task

accumUpdates

Accumulators with…​FIXME

taskInfo

TaskInfo

Note
TaskMetrics can be empty when the task has failed.

handleTaskCompletion announces task completion application-wide (by posting a SparkListenerTaskEnd to LiveListenerBus).

handleTaskCompletion checks the stage of the task out in the stageIdToStage internal registry and if not found, it simply exits.

handleTaskCompletion branches off per TaskEndReason (as event.reason).

Table 3. handleTaskCompletion Branches per TaskEndReason
TaskEndReason Description

Success

Acts according to the type of the task that completed, i.e. ShuffleMapTask and ResultTask.

Resubmitted

FetchFailed

ExceptionFailure

Updates accumulators (with partial values from the task).

ExecutorLostFailure

Does nothing

TaskCommitDenied

Does nothing

TaskKilled

Does nothing

TaskResultLost

Does nothing

UnknownReason

Does nothing

Handling Successful Task Completion

When a task has finished successfully (i.e. Success end reason), handleTaskCompletion marks the partition as no longer pending (i.e. the partition the task worked on is removed from pendingPartitions of the stage).

Note
A Stage tracks its own pending partitions using pendingPartitions property.

handleTaskCompletion branches off given the type of the task that completed, i.e. ShuffleMapTask and ResultTask.

Handling Successful ResultTask Completion

For ResultTask, the stage is assumed a ResultStage.

handleTaskCompletion finds the ActiveJob associated with the ResultStage.

Note
ResultStage tracks the optional ActiveJob as activeJob property. There could only be one active job for a ResultStage.

If there is no job for the ResultStage, you should see the following INFO message in the logs:

INFO DAGScheduler: Ignoring result from [task] because its job has finished

Otherwise, when the ResultStage has a ActiveJob, handleTaskCompletion checks the status of the partition output for the partition the ResultTask ran for.

Note
ActiveJob tracks task completions in finished property with flags for every partition in a stage. When the flag for a partition is enabled (i.e. true), it is assumed that the partition has been computed (and no results from any ResultTask are expected and hence simply ignored).
Caution
FIXME Describe why could a partition has more ResultTask running.

handleTaskCompletion ignores the CompletionEvent when the partition has already been marked as completed for the stage and simply exits.

handleTaskCompletion updates accumulators.

The partition for the ActiveJob (of the ResultStage) is marked as computed and the number of partitions calculated increased.

Note
ActiveJob tracks what partitions have already been computed and their number.

If the ActiveJob has finished (when the number of partitions computed is exactly the number of partitions in a stage) handleTaskCompletion does the following (in order):

  1. Marks ResultStage computed.

  2. Cleans up after ActiveJob and independent stages.

  3. Announces the job completion application-wide (by posting a SparkListenerJobEnd to LiveListenerBus).

Note
A task succeeded notification holds the output index and the result.

When the notification throws an exception (because it runs user code), handleTaskCompletion notifies JobListener about the failure (wrapping it inside a SparkDriverExecutionException exception).

Handling Successful ShuffleMapTask Completion

For ShuffleMapTask, the stage is assumed a ShuffleMapStage.

handleTaskCompletion updates accumulators.

The task’s result is assumed MapStatus that knows the executor where the task has finished.

You should see the following DEBUG message in the logs:

DEBUG DAGScheduler: ShuffleMapTask finished on [execId]

If the executor is registered in failedEpoch internal registry and the epoch of the completed task is not greater than that of the executor (as in failedEpoch registry), you should see the following INFO message in the logs:

INFO DAGScheduler: Ignoring possibly bogus [task] completion from executor [executorId]

Otherwise, handleTaskCompletion registers the MapStatus result for the partition with the stage (of the completed task).

handleTaskCompletion does more processing only if the ShuffleMapStage is registered as still running (in runningStages internal registry) and the ShuffleMapStage stage has no pending partitions to compute.

The ShuffleMapStage is marked as finished.

You should see the following INFO messages in the logs:

INFO DAGScheduler: looking for newly runnable stages
INFO DAGScheduler: running: [runningStages]
INFO DAGScheduler: waiting: [waitingStages]
INFO DAGScheduler: failed: [failedStages]
Note
A ShuffleMapStage stage is ready (aka available) when all partitions have shuffle outputs, i.e. when their tasks have completed.

If however the ShuffleMapStage is not ready, you should see the following INFO message in the logs:

INFO DAGScheduler: Resubmitting [shuffleStage] ([shuffleStage.name]) because some of its tasks had failed: [missingPartitions]

In the end, handleTaskCompletion submits the ShuffleMapStage for execution.

TaskEndReason: Resubmitted

For Resubmitted case, you should see the following INFO message in the logs:

INFO Resubmitted [task], so marking it as still running

The task (by task.partitionId) is added to the collection of pending partitions of the stage (using stage.pendingPartitions).

Tip
A stage knows how many partitions are yet to be calculated. A task knows about the partition id for which it was launched.

Task Failed with FetchFailed Exception — TaskEndReason: FetchFailed

FetchFailed(
  bmAddress: BlockManagerId,
  shuffleId: Int,
  mapId: Int,
  reduceId: Int,
  message: String)
extends TaskFailedReason
Table 4. FetchFailed Properties
Name Description

bmAddress

BlockManagerId

shuffleId

Used when…​

mapId

Used when…​

reduceId

Used when…​

failureMessage

Used when…​

Note
A task knows about the id of the stage it belongs to.

When FetchFailed happens, stageIdToStage is used to access the failed stage (using task.stageId and the task is available in event in handleTaskCompletion(event: CompletionEvent)). shuffleToMapStage is used to access the map stage (using shuffleId).

If failedStage.latestInfo.attemptId != task.stageAttemptId, you should see the following INFO in the logs:

INFO Ignoring fetch failure from [task] as it's from [failedStage] attempt [task.stageAttemptId] and there is a more recent attempt for that stage (attempt ID [failedStage.latestInfo.attemptId]) running
Caution
FIXME What does failedStage.latestInfo.attemptId != task.stageAttemptId mean?

And the case finishes. Otherwise, the case continues.

If the failed stage is in runningStages, the following INFO message shows in the logs:

INFO Marking [failedStage] ([failedStage.name]) as failed due to a fetch failure from [mapStage] ([mapStage.name])

markStageAsFinished(failedStage, Some(failureMessage)) is called.

Caution
FIXME What does markStageAsFinished do?

If the failed stage is not in runningStages, the following DEBUG message shows in the logs:

DEBUG Received fetch failure from [task], but its from [failedStage] which is no longer running

When disallowStageRetryForTest is set, abortStage(failedStage, "Fetch failure will not retry stage due to testing config", None) is called.

Caution
FIXME Describe disallowStageRetryForTest and abortStage.
[failedStage] ([name]) has failed the maximum allowable number of times: 4. Most recent failure reason: [failureMessage]

If there are no failed stages reported (DAGScheduler.failedStages is empty), the following INFO shows in the logs:

INFO Resubmitting [mapStage] ([mapStage.name]) and [failedStage] ([failedStage.name]) due to fetch failure

And the following code is executed:

messageScheduler.schedule(
  new Runnable {
    override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
  }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
Caution
FIXME What does the above code do?

For all the cases, the failed stage and map stages are both added to the internal registry of failed stages.

If mapId (in the FetchFailed object for the case) is provided, the map stage output is cleaned up (as it is broken) using mapStage.removeOutputLoc(mapId, bmAddress) and MapOutputTrackerMaster.unregisterMapOutput(shuffleId, mapId, bmAddress) methods.

Caution
FIXME What does mapStage.removeOutputLoc do?

If BlockManagerId (as bmAddress in the FetchFailed object) is defined, handleTaskCompletion notifies DAGScheduler that an executor was lost (with filesLost enabled and maybeEpoch from the Task that completed).

StageCancelled Event and handleStageCancellation Handler

StageCancelled(stageId: Int) extends DAGSchedulerEvent

StageCancelled is a DAGSchedulerEvent that triggers handleStageCancellation (on a separate thread).

handleStageCancellation Handler

handleStageCancellation(stageId: Int): Unit

handleStageCancellation checks if the input stageId was registered earlier (in the internal stageIdToStage registry) and if it was attempts to cancel the associated jobs (with "because Stage [stageId] was cancelled" cancellation reason).

Note
A stage tracks the jobs it belongs to using jobIds property.

If the stage stageId was not registered earlier, you should see the following INFO message in the logs:

INFO No active jobs to kill for Stage [stageId]
Note
handleStageCancellation is the result of executing SparkContext.cancelStage(stageId: Int) that is called from the web UI (controlled by spark.ui.killEnabled configuration property).

handleJobSubmitted Handler

handleJobSubmitted(
  jobId: Int,
  finalRDD: RDD[_],
  func: (TaskContext, Iterator[_]) => _,
  partitions: Array[Int],
  callSite: CallSite,
  listener: JobListener,
  properties: Properties)

handleJobSubmitted creates a new ResultStage (as finalStage in the picture below) given the input finalRDD, func, partitions, jobId and callSite.

dagscheduler handleJobSubmitted.png
Figure 4. DAGScheduler.handleJobSubmitted Method

handleJobSubmitted creates an ActiveJob (with the input jobId, callSite, listener, properties, and the ResultStage).

Caution
FIXME Why is this clearing here so important?

You should see the following INFO messages in the logs:

INFO DAGScheduler: Got job [id] ([callSite]) with [number] output partitions
INFO DAGScheduler: Final stage: [stage] ([name])
INFO DAGScheduler: Parents of final stage: [parents]
INFO DAGScheduler: Missing parents: [missingStages]

handleJobSubmitted then registers the new job in jobIdToActiveJob and activeJobs internal registries, and with the final ResultStage.

Note
ResultStage can only have one ActiveJob registered.

Ultimately, handleJobSubmitted posts SparkListenerJobStart message to LiveListenerBus and submits the stage.

ExecutorAdded Event and handleExecutorAdded Handler

ExecutorAdded(execId: String, host: String) extends DAGSchedulerEvent

ExecutorAdded is a DAGSchedulerEvent that triggers handleExecutorAdded method (on a separate thread).

Removing Executor From failedEpoch Registry — handleExecutorAdded Handler

handleExecutorAdded(execId: String, host: String)

handleExecutorAdded checks if the input execId executor was registered in failedEpoch and, if it was, removes it from the failedEpoch registry.

You should see the following INFO message in the logs:

INFO Host added was in lost list earlier: [host]

results matching ""

    No results matching ""