TaskScheduler Contract — Spark Schedulers

TaskScheduler is the abstraction of Spark schedulers that can submit tasks for execution in a Spark application (per scheduling policy).

sparkstandalone sparkcontext taskscheduler schedulerbackend.png
Figure 1. TaskScheduler and SparkContext
Note
TaskScheduler works closely with DAGScheduler that submits sets of tasks for execution (for every stage in a Spark job).

TaskScheduler can track the executors available in a Spark application using executorHeartbeatReceived and executorLost interceptors (that inform about active and lost executors, respectively).

Table 1. TaskScheduler Contract (Abstract Methods Only)
Method Description

applicationAttemptId

applicationAttemptId(): Option[String]

Unique identifier of an (execution) attempt of the Spark application

Used exclusively when SparkContext is created

cancelTasks

cancelTasks(
  stageId: Int,
  interruptThread: Boolean): Unit

Cancels all the tasks of a given stage

Used exclusively when DAGScheduler is requested to failJobAndIndependentStages

defaultParallelism

defaultParallelism(): Int

Default level of parallelism

Used exclusively when SparkContext is requested for the default level of parallelism

executorHeartbeatReceived

executorHeartbeatReceived(
  execId: String,
  accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
  blockManagerId: BlockManagerId): Boolean

Handles heartbeats (with task metrics) from executors

Expected to return true when the executor execId is managed by the TaskScheduler. false indicates that the block manager (on the executor) should re-register.

Used exclusively when HeartbeatReceiver RPC endpoint is requested to handle a Heartbeat and task metrics from an executor

executorLost

executorLost(
  executorId: String,
  reason: ExecutorLossReason): Unit

Handles an executor lost event

Used when:

  • HeartbeatReceiver RPC endpoint is requested to expireDeadHosts

  • DriverEndpoint RPC endpoint is requested to removes (forgets) and disables a malfunctioning executor (i.e. either lost or blacklisted for some reason)

  • Spark on Mesos' MesosFineGrainedSchedulerBackend is requested to recordSlaveLost

killAllTaskAttempts

killAllTaskAttempts(
  stageId: Int,
  interruptThread: Boolean,
  reason: String): Unit

Used when:

killTaskAttempt

killTaskAttempt(
  taskId: Long,
  interruptThread: Boolean,
  reason: String): Boolean

Used exclusively when DAGScheduler is requested to killTaskAttempt

rootPool

rootPool: Pool

Top-level (root) schedulable pool

Used when:

schedulingMode

schedulingMode: SchedulingMode

Used when:

setDAGScheduler

setDAGScheduler(dagScheduler: DAGScheduler): Unit

Associates a DAGScheduler

Used exclusively when DAGScheduler is created

start

start(): Unit

Starts the TaskScheduler

Used exclusively when SparkContext is created

stop

stop(): Unit

Stops the TaskScheduler

Used exclusively when DAGScheduler is requested to stop

submitTasks

submitTasks(taskSet: TaskSet): Unit

Submits the tasks (of the given TaskSet) for execution

Used exclusively when DAGScheduler is requested to submit missing tasks (of a stage)

workerRemoved

workerRemoved(
  workerId: String,
  host: String,
  message: String): Unit

Used exclusively when DriverEndpoint is requested to handle a RemoveWorker event

Table 2. TaskSchedulers (All Available Implementations)
TaskScheduler Description

TaskSchedulerImpl

Default Spark scheduler

YarnScheduler

TaskScheduler for client deploy mode in Spark on YARN

YarnClusterScheduler

TaskScheduler for cluster deploy mode in Spark on YARN

Post-Start Initialization — postStartHook Method

postStartHook(): Unit

postStartHook does nothing by default, but allows custom implementations for some additional post-start initialization.

Note

postStartHook is used when:

  • SparkContext is created (right before considered fully initialized)

  • Spark on YARN’s YarnClusterScheduler is requested to postStartHook

Unique Identifier of Spark Application — applicationId Method

applicationId(): String

applicationId is the unique identifier of the Spark application and defaults to spark-application-[currentTimeMillis].

Note
applicationId is used exclusively when SparkContext is created.

TaskScheduler’s Lifecycle

A TaskScheduler is created while SparkContext is being created (by calling SparkContext.createTaskScheduler for a given master URL and deploy mode).

taskscheduler uses schedulerbackend.png
Figure 2. TaskScheduler uses SchedulerBackend to support different clusters

At this point in SparkContext’s lifecycle, the internal _taskScheduler points at the TaskScheduler (and it is "announced" by sending a blocking TaskSchedulerIsSet message to HeartbeatReceiver RPC endpoint).

The TaskScheduler is started right after the blocking TaskSchedulerIsSet message receives a response.

The application ID and the application’s attempt ID are set at this point (and SparkContext uses the application id to set spark.app.id Spark property, and configure SparkUI, and BlockManager).

Caution
FIXME The application id is described as "associated with the job." in TaskScheduler, but I think it is "associated with the application" and you can have many jobs per application.

Right before SparkContext is fully initialized, TaskScheduler.postStartHook is called.

The internal _taskScheduler is cleared (i.e. set to null) while SparkContext is being stopped.

Warning
FIXME If it is SparkContext to start a TaskScheduler, shouldn’t SparkContext stop it too? Why is this the way it is now?

results matching ""

    No results matching ""