ShuffledRowRDD

ShuffledRowRDD is an RDD of internal binary rows (i.e. RDD[InternalRow]) that is created when:

  • ShuffleExchangeExec physical operator is requested to create one

  • CollectLimitExec and TakeOrderedAndProjectExec physical operators are executed

ShuffledRowRDD takes the following to be created:

  • Spark Core ShuffleDependency[Int, InternalRow, InternalRow]

  • Optional partition start indices (Option[Array[Int]] , default: None)

Note
The dependency property is mutable so it can be cleared.

ShuffledRowRDD takes an optional partition start indices that is the number of post-shuffle partitions. When not specified, the number of post-shuffle partitions is managed by the Partitioner of the input ShuffleDependency. Otherwise, when specified (when ExchangeCoordinator is requested to doEstimationIfNecessary), ShuffledRowRDD…​FIXME

Note
Post-shuffle partition is…​FIXME
Note
ShuffledRowRDD is similar to Spark Core’s ShuffledRDD, with the difference being the type of the values to process, i.e. InternalRow and (K, C) key-value pairs, respectively.
Table 1. ShuffledRowRDD and RDD Contract
Name Description

getDependencies

A single-element collection with ShuffleDependency[Int, InternalRow, InternalRow].

partitioner

CoalescedPartitioner (with the Partitioner of the dependency)

getPreferredLocations

compute

numPreShufflePartitions Internal Property

numPreShufflePartitions: Int

numPreShufflePartitions is exactly the number of partitions of the Partitioner of the given ShuffleDependency.

Note
numPreShufflePartitions is used when ShuffledRowRDD is requested for the partitionStartIndices (with no optional partition indices given) and partitions.

partitionStartIndices Internal Property

partitionStartIndices: Array[Int]

partitionStartIndices is whatever given by the optional partition indices or an empty array of numPreShufflePartitions elements (that means that post-shuffle partitions correspond to pre-shuffle partitions).

Note
partitionStartIndices is used when ShuffledRowRDD is requested for the partitions and Partitioner.

part Internal Property

part: Partitioner

part is simply a CoalescedPartitioner (for the Partitioner of the given ShuffleDependency and the partitionStartIndices).

Note
part is used when ShuffledRowRDD is requested for the Partitioner and the partitions.

Computing Partition (in TaskContext) — compute Method

compute(
  split: Partition,
  context: TaskContext): Iterator[InternalRow]
Note
compute is part of Spark Core’s RDD contract to compute a partition (in a TaskContext).

Internally, compute makes sure that the input split is a ShuffledRowRDDPartition. compute then requests the ShuffleManager for a ShuffleReader to read InternalRows for the input split partition.

Note
compute uses Spark Core’s SparkEnv to access the current ShuffleManager.
Note
compute uses ShuffleHandle (of the ShuffleDependency) with the pre-shuffle start and end partition offsets of the ShuffledRowRDDPartition.

Getting Placement Preferences of Partition — getPreferredLocations Method

getPreferredLocations(partition: Partition): Seq[String]
Note
getPreferredLocations is part of RDD contract to specify placement preferences (aka preferred task locations), i.e. where tasks should be executed to be as close to the data as possible.

Internally, getPreferredLocations requests MapOutputTrackerMaster for the preferred locations of the single ShuffleDependency and the input partition.

Note
getPreferredLocations uses SparkEnv to access the current MapOutputTrackerMaster (that runs on the driver).

CoalescedPartitioner

Caution
FIXME

ShuffledRowRDDPartition

Caution
FIXME

Clearing Dependencies — clearDependencies Method

clearDependencies(): Unit
Note
clearDependencies is part of the RDD contract to clear dependencies of the RDD (to enable the parent RDDs to be garbage collected).

clearDependencies simply requests the parent RDD to clearDependencies followed by clear the given dependency (i.e. set to null).

results matching ""

    No results matching ""