Task — Smallest Individual Unit of Execution

Task is the abstraction of smallest individual units of execution that computes a single RDD partition.

Table 1. Task Contract
Method Description

runTask

runTask(context: TaskContext): T

Runs the task

Used exclusively when Task is requested to run

Task is created when DAGScheduler is requested to submit missing tasks of a stage.

Note
Task is a Scala abstract class and cannot be created directly. It is created indirectly for the concrete Tasks.
spark rdd partitions job stage tasks.png
Figure 1. Tasks Are Runtime Representation of RDD Partitions

Task is described by the following:

  • ID of the stage (Stage ID)

  • Stage (execution) attempt ID

  • ID of the partition (Partition ID)

  • Local properties

  • Serialized TaskMetrics (Array[Byte])

  • Optional ID of the job (default: None)

  • Optional ID of the Spark application (default: None)

  • Optional ID of the Spark application’s (execution) attempt ID (default: None)

  • isBarrier flag that is to say whether the task belongs to a barrier stage (default: false)

Task can be run (possibly on preferred executor).

In other words, a task is a computation on the records in a RDD partition in a stage of a RDD in a Spark job.

Note
In Scala Task is actually Task[T] in which T is the type of the result of a task (i.e. the type of the value computed).
Table 2. Tasks
Task Description

ResultTask

Computes a ResultStage and gives the result back to the driver

ShuffleMapTask

Computes a ShuffleMapStage

In most cases, the last stage of a Spark job consists of one or more ResultTasks, while earlier stages are ShuffleMapTasks.

Note
It is possible to have one or more ShuffleMapTasks as part of the last stage.

A task can only belong to one stage and operate on a single partition. All tasks in a stage must be completed before the stages that follow can start.

Tasks are spawned one by one for each stage and partition.

preferredLocations Method

preferredLocations: Seq[TaskLocation] = Nil

TaskLocations that represent preferred locations (executors) to execute the task on.

Empty by default and so no task location preferences are defined that says the task could be launched on any executor.

Note
Defined by the concrete tasks, i.e. ShuffleMapTask and ResultTask.
Note
preferredLocations is used exclusively when TaskSetManager is requested to register a task as pending execution and dequeueSpeculativeTask.

Running Task Thread — run Final Method

run(
  taskAttemptId: Long,
  attemptNumber: Int,
  metricsSystem: MetricsSystem): T

run creates a TaskContextImpl that in turn becomes the task’s TaskContext.

Note
run is a final method and so must not be overriden.

run checks _killed flag and, if enabled, kills the task (with interruptThread flag disabled).

run creates a Hadoop CallerContext and sets it.

Note
This is the moment when the custom Task's runTask is executed.

In the end, run notifies TaskContextImpl that the task has completed (regardless of the final outcome — a success or a failure).

In case of any exceptions, run notifies TaskContextImpl that the task has failed. run requests MemoryStore to release unroll memory for this task (for both ON_HEAP and OFF_HEAP memory modes).

Note
run uses SparkEnv to access the current BlockManager that it uses to access MemoryStore.
Note
run is used exclusively when TaskRunner is requested to run (when Executor is requested to launch a task (on "Executor task launch worker" thread pool sometime in the future)).
  1. The Task instance has just been deserialized from taskBytes that were sent over the wire to an executor. localProperties and TaskMemoryManager are already assigned.

Task States

A task can be in one of the following states (as described by TaskState enumeration):

  • LAUNCHING

  • RUNNING when the task is being started.

  • FINISHED when the task finished with the serialized result.

  • FAILED when the task fails, e.g. when FetchFailedException, CommitDeniedException or any Throwable occurs

  • KILLED when an executor kills a task.

  • LOST

States are the values of org.apache.spark.TaskState.

Note
Task status updates are sent from executors to the driver through ExecutorBackend.

Task is finished when it is in one of FINISHED, FAILED, KILLED, LOST.

LOST and FAILED states are considered failures.

Tip
Task states correspond to org.apache.mesos.Protos.TaskState.

Collect Latest Values of (Internal and External) Accumulators — collectAccumulatorUpdates Method

collectAccumulatorUpdates(taskFailed: Boolean = false): Seq[AccumulableInfo]

collectAccumulatorUpdates collects the latest values of internal and external accumulators from a task (and returns the values as a collection of AccumulableInfo).

Internally, collectAccumulatorUpdates takes TaskMetrics.

Note
collectAccumulatorUpdates uses TaskContextImpl to access the task’s TaskMetrics.

collectAccumulatorUpdates collects the latest values of:

collectAccumulatorUpdates returns an empty collection when TaskContextImpl is not initialized.

Note
collectAccumulatorUpdates is used when TaskRunner runs a task (and sends a task’s final results back to the driver).

Killing Task — kill Method

kill(interruptThread: Boolean)

kill marks the task to be killed, i.e. it sets the internal _killed flag to true.

kill calls TaskContextImpl.markInterrupted when context is set.

If interruptThread is enabled and the internal taskThread is available, kill interrupts it.

Caution
FIXME When could context and interruptThread not be set?

Internal Properties

Table 3. Task’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

_executorDeserializeCpuTime

_executorDeserializeTime

_reasonIfKilled

_killed

context

TaskContext

Set to be a BarrierTaskContext or TaskContextImpl when the isBarrier flag is enabled or not, respectively, when Task is requested to run

epoch

Task epoch

Starts as -1

Set when TaskSetManager is created (to be the epoch of the MapOutputTrackerMaster)

metrics

TaskMetrics

Created lazily when Task is created from serializedTaskMetrics.

taskMemoryManager

TaskMemoryManager that manages the memory allocated by the task.

taskThread

results matching ""

    No results matching ""