TaskSchedulerImpl — Default Spark Scheduler

TaskSchedulerImpl is the default TaskScheduler that uses a SchedulerBackend to schedule tasks (for execution on a cluster manager).

When a Spark application starts (and so an instance of SparkContext is created) TaskSchedulerImpl with a SchedulerBackend and DAGScheduler are created and soon started.

taskschedulerimpl sparkcontext schedulerbackend dagscheduler.png
Figure 1. TaskSchedulerImpl and Other Services

Using spark.scheduler.mode setting you can select the scheduling policy.

TaskSchedulerImpl submits tasks using SchedulableBuilders.

TaskSchedulerImpl uses spark.task.cpus configuration property (default: 1) for…​FIXME

Table 1. TaskSchedulerImpl’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

dagScheduler

Used when…​FIXME

executorIdToHost

Lookup table of hosts per executor.

Used when…​FIXME

executorIdToRunningTaskIds

Lookup table of running tasks per executor.

Used when…​FIXME

executorIdToTaskCount

Lookup table of the number of running tasks by executor.

executorsByHost

Collection of executors per host

hasLaunchedTask

Flag…​FIXME

Used when…​FIXME

hostToExecutors

Lookup table of executors per hosts in a cluster.

Used when…​FIXME

hostsByRack

Lookup table of hosts per rack.

Used when…​FIXME

nextTaskId

The next task id counting from 0.

Used when TaskSchedulerImpl…​

rootPool

Used when TaskSchedulerImpl…​

schedulableBuilder

Created when TaskSchedulerImpl is requested to initialize and can be one of two available builders:

Note
Use spark.scheduler.mode setting to select the scheduling policy.

schedulingMode

Used when TaskSchedulerImpl…​

taskSetsByStageIdAndAttempt

Lookup table of TaskSet by stage and attempt ids.

taskIdToExecutorId

Lookup table of executor by task id.

taskIdToTaskSetManager

Registry of active TaskSetManager per task id.

Tip

Enable INFO or DEBUG logging levels for org.apache.spark.scheduler.TaskSchedulerImpl logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.scheduler.TaskSchedulerImpl=DEBUG

Refer to Logging.

TaskSchedulerImpl and SchedulerBackend

TaskSchedulerImpl is assigned a SchedulerBackend at initialization.

The lifecycle of the SchedulerBackend is tightly coupled to the lifecycle of the TaskSchedulerImpl:

  • At startup, TaskSchedulerImpl requests the SchedulerBackend to start, too

  • When stopped, so is the SchedulerBackend

TaskSchedulerImpl waits until the SchedulerBackend is ready before requesting it for the following:

Finding Unique Identifier of Spark Application — applicationId Method

applicationId(): String
Note
applicationId is part of TaskScheduler contract to find the Spark application’s id.

applicationId simply request SchedulerBackend for the Spark application’s id.

nodeBlacklist Method

Caution
FIXME

cleanupTaskState Method

Caution
FIXME

newTaskId Method

Caution
FIXME

getExecutorsAliveOnHost Method

Caution
FIXME

isExecutorAlive Method

Caution
FIXME

hasExecutorsAliveOnHost Method

Caution
FIXME

hasHostAliveOnRack Method

Caution
FIXME

executorLost Method

Caution
FIXME

mapOutputTracker

Caution
FIXME

starvationTimer

Caution
FIXME

executorHeartbeatReceived Method

executorHeartbeatReceived(
  execId: String,
  accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
  blockManagerId: BlockManagerId): Boolean

executorHeartbeatReceived is…​

Caution
FIXME
Note
executorHeartbeatReceived is part of the TaskScheduler Contract.

Cancelling All Tasks of Stage — cancelTasks Method

cancelTasks(stageId: Int, interruptThread: Boolean): Unit
Note
cancelTasks is part of TaskScheduler contract.

cancelTasks cancels all tasks submitted for execution in a stage stageId.

Note
cancelTasks is used exclusively when DAGScheduler cancels a stage.

handleSuccessfulTask Method

handleSuccessfulTask(
  taskSetManager: TaskSetManager,
  tid: Long,
  taskResult: DirectTaskResult[_]): Unit

handleSuccessfulTask simply forwards the call to the input taskSetManager (passing tid and taskResult).

handleTaskGettingResult Method

handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long): Unit

handleTaskGettingResult simply forwards the call to the taskSetManager.

applicationAttemptId Method

applicationAttemptId(): Option[String]
Caution
FIXME

Tracking Racks per Hosts and Ports — getRackForHost Method

getRackForHost(value: String): Option[String]

getRackForHost is a method to know about the racks per hosts and ports. By default, it assumes that racks are unknown (i.e. the method returns None).

Note
It is overriden by the YARN-specific TaskScheduler YarnScheduler.

getRackForHost is currently used in two places:

Creating TaskSchedulerImpl Instance

TaskSchedulerImpl takes the following when created:

TaskSchedulerImpl initializes the internal registries and counters.

Note
There is another TaskSchedulerImpl constructor that requires a SparkContext object only and sets maxTaskFailures to spark.task.maxFailures or, if not set, defaults to 4.

TaskSchedulerImpl sets schedulingMode to the value of spark.scheduler.mode setting (defaults to FIFO).

Note
schedulingMode is part of TaskScheduler Contract.

Failure to set schedulingMode results in a SparkException:

Unrecognized spark.scheduler.mode: [schedulingModeConf]

Ultimately, TaskSchedulerImpl creates a TaskResultGetter.

Initializing — initialize Method

initialize(backend: SchedulerBackend): Unit

initialize initializes TaskSchedulerImpl.

TaskSchedulerImpl initialize.png
Figure 2. TaskSchedulerImpl initialization

initialize saves the input SchedulerBackend.

initialize then sets schedulable Pool as an empty-named Pool (passing in SchedulingMode, initMinShare and initWeight as 0).

initialize sets SchedulableBuilder (based on SchedulingMode):

Caution
FIXME Why are rootPool and schedulableBuilder created only now? What do they need that it is not available when TaskSchedulerImpl is created?

Starting TaskSchedulerImpl — start Method

As part of initialization of a SparkContext, TaskSchedulerImpl is started (using start from the TaskScheduler Contract).

start(): Unit

start starts the scheduler backend.

taskschedulerimpl start standalone.png
Figure 3. Starting TaskSchedulerImpl in Spark Standalone

Handling Task Status Update — statusUpdate Method

statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer): Unit

statusUpdate finds TaskSetManager for the input tid task (in taskIdToTaskSetManager).

When state is LOST, statusUpdate…​FIXME

Note
TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode.

When state is one of the finished states, i.e. FINISHED, FAILED, KILLED or LOST, statusUpdate cleanupTaskState for the input tid.

If a task is in LOST state, statusUpdate notifies DAGScheduler that the executor was lost (with SlaveLost and the reason Task [tid] was lost, so marking the executor as lost as well.) and requests SchedulerBackend to revive offers.

In case the TaskSetManager for tid could not be found (in taskIdToTaskSetManager registry), you should see the following ERROR message in the logs:

ERROR Ignoring update with state [state] for TID [tid] because its task set is gone (this is likely the result of receiving duplicate task finished status updates)

Any exception is caught and reported as ERROR message in the logs:

ERROR Exception in statusUpdate
Caution
FIXME image with scheduler backends calling TaskSchedulerImpl.statusUpdate.
Note

statusUpdate is used when:

  1. DriverEndpoint (of CoarseGrainedSchedulerBackend) is requested to handle a StatusUpdate message

  2. LocalEndpoint is requested to handle a StatusUpdate message

  3. MesosFineGrainedSchedulerBackend is requested to handle a task status update

task-scheduler-speculation Scheduled Executor Service — speculationScheduler Internal Attribute

speculationScheduler is a java.util.concurrent.ScheduledExecutorService with the name task-scheduler-speculation for speculative execution of tasks.

When TaskSchedulerImpl starts (in non-local run mode) with spark.speculation enabled, speculationScheduler is used to schedule checkSpeculatableTasks to execute periodically every spark.speculation.interval after the initial spark.speculation.interval passes.

speculationScheduler is shut down when TaskSchedulerImpl stops.

Checking for Speculatable Tasks — checkSpeculatableTasks Method

checkSpeculatableTasks(): Unit

checkSpeculatableTasks requests rootPool to check for speculatable tasks (if they ran for more than 100 ms) and, if there any, requests SchedulerBackend to revive offers.

Note
checkSpeculatableTasks is executed periodically as part of speculative execution of tasks.

Acceptable Number of Task Failures — maxTaskFailures Attribute

The acceptable number of task failures (maxTaskFailures) can be explicitly defined when creating TaskSchedulerImpl instance or based on spark.task.maxFailures setting that defaults to 4 failures.

Note
It is exclusively used when submitting tasks through TaskSetManager.

Cleaning up After Removing Executor — removeExecutor Internal Method

removeExecutor(executorId: String, reason: ExecutorLossReason): Unit

removeExecutor removes the executorId executor from the following internal registries: executorIdToTaskCount, executorIdToHost, executorsByHost, and hostsByRack. If the affected hosts and racks are the last entries in executorsByHost and hostsByRack, appropriately, they are removed from the registries.

Unless reason is LossReasonPending, the executor is removed from executorIdToHost registry and TaskSetManagers get notified.

Note
The internal removeExecutor is called as part of statusUpdate and executorLost.

Handling Nearly-Completed SparkContext Initialization — postStartHook Callback

postStartHook(): Unit
Note
postStartHook is part of the TaskScheduler Contract to notify a task scheduler that the SparkContext (and hence the Spark application itself) is about to finish initialization.

Stopping TaskSchedulerImpl — stop Method

stop(): Unit

stop() stops all the internal services, i.e. task-scheduler-speculation executor service, SchedulerBackend, TaskResultGetter, and starvationTimer timer.

Finding Default Level of Parallelism — defaultParallelism Method

defaultParallelism(): Int
Note
defaultParallelism is part of TaskScheduler contract as a hint for sizing jobs.

defaultParallelism simply requests SchedulerBackend for the default level of parallelism.

Note
Default level of parallelism is a hint for sizing jobs that SparkContext uses to create RDDs with the right number of partitions when not specified explicitly.

Submitting Tasks (of TaskSet) for Execution — submitTasks Method

submitTasks(taskSet: TaskSet): Unit
Note
submitTasks is part of the TaskScheduler Contract to submit the tasks (of the given TaskSet) for execution.

In essence, submitTasks registers a new TaskSetManager (for the given TaskSet) and requests the SchedulerBackend to handle resource allocation offers (from the scheduling system).

taskschedulerImpl submitTasks.png
Figure 4. TaskSchedulerImpl.submitTasks

Internally, submitTasks first prints out the following INFO message to the logs:

Adding task set [id] with [length] tasks

submitTasks then creates a TaskSetManager (for the given TaskSet and the acceptable number of task failures).

submitTasks registers (adds) the TaskSetManager per stage and stage attempt IDs (of the TaskSet) in the taskSetsByStageIdAndAttempt internal registry.

Note
taskSetsByStageIdAndAttempt internal registry tracks the TaskSetManagers (that represent TaskSets) per stage and stage attempts. In other words, there could be many TaskSetManagers for a single stage, each representing a unique stage attempt.
Note
Not only could a task be retried (cf. acceptable number of task failures), but also a single stage.

submitTasks makes sure that there is exactly one active TaskSetManager (with different TaskSet) across all the managers (for the stage). Otherwise, submitTasks throws an IllegalStateException:

more than one active taskSet for stage [stage]: [TaskSet ids]
Note
TaskSetManager is considered active when it is not a zombie.
Note
The schedulable pool can be a single flat linked queue (in FIFO scheduling mode) or a hierarchy of pools of Schedulables (in FAIR scheduling mode).

submitTasks schedules a starvation task to make sure that the requested resources (i.e. CPU and memory) are assigned to the Spark application for a non-local environment (the very first time the Spark application is started per hasReceivedTask flag).

Note
The very first time (hasReceivedTask flag is false) in cluster mode only (i.e. isLocal of the TaskSchedulerImpl is false), starvationTimer is scheduled to execute after spark.starvation.timeout to ensure that the requested resources, i.e. CPUs and memory, were assigned by a cluster manager.
Note
After the first spark.starvation.timeout passes, the hasReceivedTask internal flag is true.

In the end, submitTasks requests the SchedulerBackend to reviveOffers.

Tip
Use dag-scheduler-event-loop thread to step through the code in a debugger.

Scheduling Starvation Task

Every time the starvation timer thread is executed and hasLaunchedTask flag is false, the following WARN message is printed out to the logs:

WARN Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

Otherwise, when the hasLaunchedTask flag is true the timer thread cancels itself.

Creating TaskSetManager — createTaskSetManager Method

createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager

createTaskSetManager creates a TaskSetManager (passing on the reference to TaskSchedulerImpl, the input taskSet and maxTaskFailures, and optional BlacklistTracker).

Note
createTaskSetManager uses the optional BlacklistTracker that is specified when TaskSchedulerImpl is created.
Note
createTaskSetManager is used exclusively when TaskSchedulerImpl submits tasks (for a given TaskSet).

Notifying TaskSetManager that Task Failed — handleFailedTask Method

handleFailedTask(
  taskSetManager: TaskSetManager,
  tid: Long,
  taskState: TaskState,
  reason: TaskFailedReason): Unit
Note
handleFailedTask is called when TaskResultGetter deserializes a TaskFailedReason for a failed task.

taskSetFinished Method

taskSetFinished(manager: TaskSetManager): Unit

taskSetFinished looks all TaskSets up by the stage id (in taskSetsByStageIdAndAttempt registry) and removes the stage attempt from them, possibly with removing the entire stage record from taskSetsByStageIdAndAttempt registry completely (if there are no other attempts registered).

taskschedulerimpl tasksetmanager tasksetfinished.png
Figure 5. TaskSchedulerImpl.taskSetFinished is called when all tasks are finished
Note
A TaskSetManager manages a TaskSet for a stage.

You should see the following INFO message in the logs:

Removed TaskSet [id], whose tasks have all completed, from pool [name]
Note
taskSetFinished method is called when TaskSetManager has received the results of all the tasks in a TaskSet.

Notifying DAGScheduler About New Executor — executorAdded Method

executorAdded(execId: String, host: String)
Caution
FIXME Image with a call from TaskSchedulerImpl to DAGScheduler, please.
Note
executorAdded uses DAGScheduler that was given when setDAGScheduler.

Waiting Until SchedulerBackend is Ready — waitBackendReady Internal Method

waitBackendReady(): Unit

waitBackendReady waits until the SchedulerBackend is ready. If it is, waitBackendReady returns immediately. Otherwise, waitBackendReady keeps checking every 100 milliseconds (hardcoded) or the SparkContext is stopped.

Note
A SchedulerBackend is ready by default.

If the SparkContext happens to be stopped while waiting, waitBackendReady throws an IllegalStateException:

Spark context stopped while waiting for backend
Note
waitBackendReady is used exclusively when TaskSchedulerImpl is requested to handle a notification that SparkContext is about to be fully initialized.

Creating TaskDescriptions For Available Executor Resource Offers (with CPU Cores) — resourceOffers Method

resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]]

resourceOffers takes the resources offers (as WorkerOffers) and generates a collection of tasks (as TaskDescription) to launch (given the resources available).

Note
WorkerOffer represents a resource offer with CPU cores free to use on an executor.
taskscheduler resourceOffers.png
Figure 6. Processing Executor Resource Offers

Internally, resourceOffers first updates hostToExecutors and executorIdToHost lookup tables to record new hosts and executors (given the input offers).

Note
TaskSchedulerImpl uses resourceOffers to track active executors.
Caution
FIXME a picture with executorAdded call from TaskSchedulerImpl to DAGScheduler.

resourceOffers requests BlacklistTracker to applyBlacklistTimeout and filters out offers on blacklisted nodes and executors.

Note
resourceOffers uses the optional BlacklistTracker that was given when TaskSchedulerImpl was created.
Caution
FIXME Expand on blacklisting

resourceOffers then randomly shuffles offers (to evenly distribute tasks across executors and avoid over-utilizing some executors) and initializes the local data structures tasks and availableCpus (as shown in the figure below).

TaskSchedulerImpl resourceOffers internal structures.png
Figure 7. Internal Structures of resourceOffers with 5 WorkerOffers (with 4, 2, 0, 3, 2 free cores)
TaskSchedulerImpl resourceOffers rootPool getSortedTaskSetQueue.png
Figure 8. TaskSchedulerImpl Requesting TaskSets (as TaskSetManagers) from Root Pool
Note

rootPool is configured when TaskSchedulerImpl is initialized.

TaskSetManager manages execution of the tasks in a single TaskSet that represents a single Stage.

For every TaskSetManager (in scheduling order), you should see the following DEBUG message in the logs:

parentName: [name], name: [name], runningTasks: [count]

Only if a new executor was added, resourceOffers notifies every TaskSetManager about the change (to recompute locality preferences).

resourceOffers then takes every TaskSetManager (in scheduling order) and offers them each node in increasing order of locality levels (per TaskSetManager’s valid locality levels).

Note
A TaskSetManager computes locality levels of the tasks it manages.

For every TaskSetManager and the TaskSetManager's valid locality level, resourceOffers tries to find tasks to schedule (on executors) as long as the TaskSetManager manages to launch a task (given the locality level).

If resourceOffers did not manage to offer resources to a TaskSetManager so it could launch any task, resourceOffers requests the TaskSetManager to abort the TaskSet if completely blacklisted.

When resourceOffers managed to launch a task, the internal hasLaunchedTask flag gets enabled (that effectively means what the name says "there were executors and I managed to launch a task").

Note

resourceOffers is used when:

Finding Tasks from TaskSetManager to Schedule on Executors — resourceOfferSingleTaskSet Internal Method

resourceOfferSingleTaskSet(
  taskSet: TaskSetManager,
  maxLocality: TaskLocality,
  shuffledOffers: Seq[WorkerOffer],
  availableCpus: Array[Int],
  tasks: Seq[ArrayBuffer[TaskDescription]]): Boolean

resourceOfferSingleTaskSet takes every WorkerOffer (from the input shuffledOffers) and (only if the number of available CPU cores (using the input availableCpus) is at least spark.task.cpus) requests TaskSetManager (as the input taskSet) to find a Task to execute (given the resource offer) (as an executor, a host, and the input maxLocality).

resourceOfferSingleTaskSet adds the task to the input tasks collection.

resourceOfferSingleTaskSet records the task id and TaskSetManager in the following registries:

resourceOfferSingleTaskSet decreases spark.task.cpus from the input availableCpus (for the WorkerOffer).

Note
resourceOfferSingleTaskSet makes sure that the number of available CPU cores (in the input availableCpus per WorkerOffer) is at least 0.

If there is a TaskNotSerializableException, you should see the following ERROR in the logs:

ERROR Resource offer failed, task set [name] was not serializable

resourceOfferSingleTaskSet returns whether a task was launched or not.

Note
resourceOfferSingleTaskSet is used when TaskSchedulerImpl creates TaskDescriptions for available executor resource offers (with CPU cores).

TaskLocality — Task Locality Preference

TaskLocality represents a task locality preference and can be one of the following (from most localized to the widest):

  1. PROCESS_LOCAL

  2. NODE_LOCAL

  3. NO_PREF

  4. RACK_LOCAL

  5. ANY

WorkerOffer — Free CPU Cores on Executor

WorkerOffer(executorId: String, host: String, cores: Int)

WorkerOffer represents a resource offer with free CPU cores available on an executorId executor on a host.

workerRemoved Method

workerRemoved(workerId: String, host: String, message: String): Unit
Note
workerRemoved is part of the TaskScheduler Contract to…​FIXME.

workerRemoved prints out the following INFO message to the logs:

Handle removed worker [workerId]: [message]

In the end, workerRemoved simply requests the DAGScheduler to handle the worker removed.

results matching ""

    No results matching ""