ShuffleMapTask — Task for ShuffleMapStage

ShuffleMapTask is a Task that computes a MapStatus, i.e. writes the result of computing records in a RDD partition to the shuffle system and returns information about the BlockManager and estimated size of the result shuffle blocks.

ShuffleMapTask is created exclusively when DAGScheduler submits missing tasks for a ShuffleMapStage.

Table 1. ShuffleMapTask’s Internal Registries and Counters
Name Description


Collection of TaskLocations.

Corresponds directly to unique entries in locs with the only rule that when locs is not defined, it is empty, and no task location preferences are defined.

Initialized when ShuffleMapTask is created.

Used exclusively when ShuffleMapTask is requested for preferred locations.

Spark uses broadcast variables to send (serialized) tasks to executors.

Creating ShuffleMapTask Instance

ShuffleMapTask takes the following when created:

  • stageId — the stage of the task

  • stageAttemptId — the stage’s attempt

  • taskBinary — the broadcast variable with the serialized task (as an array of bytes)

  • Partition

  • Collection of TaskLocations

  • localProperties — task-specific local properties

  • serializedTaskMetrics — the serialized FIXME (as an array of bytes)

  • jobId — optional ActiveJob id (default: undefined)

  • appId — optional application id (default: undefined)

  • appAttemptId — optional application attempt id (default: undefined)

ShuffleMapTask calculates preferredLocs internal attribute that is the input locs if defined. Otherwise, it is empty.

preferredLocs and locs are transient so they are not sent over the wire with the task.

ShuffleMapTask initializes the internal registries and counters.

Writing Records (After Computing RDD Partition) to Shuffle System — runTask Method

runTask(context: TaskContext): MapStatus
runTask is part of Task contract to…​FIXME

runTask computes a MapStatus (which is the BlockManager and an estimated size of the result shuffle block) after the records of the Partition were written to the shuffle system.

runTask measures the thread and CPU time for deserialization (using the System clock and JMX if supported) and stores it in _executorDeserializeTime and _executorDeserializeCpuTime attributes.

The taskBinary serialized task is given when ShuffleMapTask is created.
The partitionId partition is given when ShuffleMapTask is created.

runTask gets the records in the RDD partition (as an Iterator) and writes them (to the shuffle system).

This is the moment in Task's lifecycle (and its corresponding RDD) when a RDD partition is computed and in turn becomes a sequence of records (i.e. real data) on a executor.

runTask stops the ShuffleWriter (with success flag enabled) and returns the MapStatus.

When the record writing was not successful, runTask stops the ShuffleWriter (with success flag disabled) and the exception is re-thrown.

You may also see the following DEBUG message in the logs when the ShuffleWriter could not be stopped.

DEBUG Could not stop writer

preferredLocations Method

preferredLocations: Seq[TaskLocation]
preferredLocations is part of Task contract to…​FIXME

preferredLocations simply returns preferredLocs internal property.

results matching ""

    No results matching ""