ShuffleMapTask — Task to Compute MapStatus for ShuffleMapStage

ShuffleMapTask is a concrete Task that computes a MapStatus, i.e. writes the result of executing a serialized task code over the records (in a RDD partition) to the shuffle system and returns information about the BlockManager and estimated size of the result shuffle blocks.

ShuffleMapTask is created exclusively when DAGScheduler is requested to submitting missing tasks of a ShuffleMapStage.

Note

Spark uses broadcast variables to send (serialized) tasks to executors.

runTask expects that the serialized task binary should be a tuple of a RDD and a ShuffleDependency.

Tip

Enable ALL logging level for org.apache.spark.scheduler.ShuffleMapTask logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.scheduler.ShuffleMapTask=ALL

Refer to Logging.

Creating ShuffleMapTask Instance

ShuffleMapTask takes the following to be created:

  • Stage ID

  • Stage attempt ID

  • Broadcast variable with the serialized code to execute (Broadcast[Array[Byte]])

  • RDD partition

  • TaskLocations (Seq[TaskLocation])

  • Task-specific local properties

  • Serialized task metrics (Array[Byte])

  • Optional job ID (default: None)

  • Optional application ID (default: None)

  • Optional application attempt ID (default: None)

  • isBarrier flag (default: false)

ShuffleMapTask calculates preferredLocs internal attribute that is the input locs if defined. Otherwise, it is empty.

Note
preferredLocs and locs are transient values and so they are not sent over the wire with the task.

ShuffleMapTask initializes the internal registries and counters.

Running Task — runTask Method

runTask(context: TaskContext): MapStatus
Note
runTask is part of Task contract to run a task.

runTask writes the result (records) of executing the serialized task code over the records (in the RDD partition) to the shuffle system and returns a MapStatus (with the BlockManager and an estimated size of the result shuffle blocks).

Internally, runTask requests the SparkEnv for the new instance of closure serializer and requests it to deserialize the taskBinary (into a tuple of a RDD and a ShuffleDependency).

runTask measures the thread and CPU deserialization times.

runTask requests the SparkEnv for the ShuffleManager and requests it for a ShuffleWriter (for the ShuffleHandle, the RDD partition, and the TaskContext).

runTask then requests the RDD for the records (of the partition) that the ShuffleWriter is requested to write out (to the shuffle system).

In the end, runTask requests the ShuffleWriter to stop (with the success flag on) and returns the shuffle map output status.

Note
This is the moment in Task's lifecycle (and its corresponding RDD) when a RDD partition is computed and in turn becomes a sequence of records (i.e. real data) on an executor.

In case of any exceptions, runTask requests the ShuffleWriter to stop (with the success flag off) and (re)throws the exception.

runTask may also print out the following DEBUG message to the logs when the ShuffleWriter could not be stopped.

DEBUG Could not stop writer

preferredLocations Method

preferredLocations: Seq[TaskLocation]
Note
preferredLocations is part of Task contract to…​FIXME

preferredLocations simply returns preferredLocs internal property.

Internal Properties

Table 1. ShuffleMapTask’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

preferredLocs

TaskLocations that are the unique entries in the given locs with the only rule that when locs is not defined, it is empty, and no task location preferences are defined.

Initialized when ShuffleMapTask is created

Used exclusively when ShuffleMapTask is requested for the preferred locations

results matching ""

    No results matching ""