ShuffledRDD is an RDD of key-value pairs that represents the shuffle step in a RDD lineage. It uses custom ShuffledRDDPartition partitions.

A ShuffledRDD is created for RDD transformations that trigger a data shuffling:

  1. coalesce transformation (with shuffle flag enabled).

  2. PairRDDFunctions's combineByKeyWithClassTag and partitionBy (when the parent RDD’s and specified Partitioners are different).

  3. OrderedRDDFunctions's sortByKey and repartitionAndSortWithinPartitions ordered operators.

scala> val rdd = sc.parallelize(0 to 9)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> rdd.getNumPartitions
res0: Int = 8

// ShuffledRDD and coalesce Example

scala> rdd.coalesce(numPartitions = 4, shuffle = true).toDebugString
res1: String =
(4) MapPartitionsRDD[4] at coalesce at <console>:27 []
 |  CoalescedRDD[3] at coalesce at <console>:27 []
 |  ShuffledRDD[2] at coalesce at <console>:27 []
 +-(8) MapPartitionsRDD[1] at coalesce at <console>:27 []
    |  ParallelCollectionRDD[0] at parallelize at <console>:24 []

// ShuffledRDD and sortByKey Example

scala> val grouped = rdd.groupBy(_ % 2)
grouped: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[6] at groupBy at <console>:26

scala> grouped.sortByKey(numPartitions = 2).toDebugString
res2: String =
(2) ShuffledRDD[9] at sortByKey at <console>:29 []
 +-(8) ShuffledRDD[6] at groupBy at <console>:26 []
    +-(8) MapPartitionsRDD[5] at groupBy at <console>:26 []
       |  ParallelCollectionRDD[0] at parallelize at <console>:24 []

ShuffledRDD takes a parent RDD and a Partitioner when created.

getDependencies returns a single-element collection of RDD dependencies with a ShuffleDependency (with the Serializer according to map-side combine internal flag).

Map-Side Combine mapSideCombine Internal Flag

mapSideCombine: Boolean

mapSideCombine internal flag is used to select the Serializer (for shuffling) when ShuffleDependency is created (which is the one and only Dependency of a ShuffledRDD).

mapSideCombine is only used when userSpecifiedSerializer optional Serializer is not specified explicitly (which is the default).

If enabled (i.e. true), mapSideCombine directs to find the Serializer for the types K and C. Otherwise, getDependencies finds the Serializer for the types K and V.

The types K, C and V are specified when ShuffledRDD is created.

mapSideCombine is disabled (i.e. false) when ShuffledRDD is created and can be set using setMapSideCombine method.

setMapSideCombine method is only used in the experimental PairRDDFunctions.combineByKeyWithClassTag transformations.

Computing Partition (in TaskContext) — compute Method

compute(split: Partition, context: TaskContext): Iterator[(K, C)]
compute is a part of RDD contract to compute a given partition in a TaskContext.

Internally, compute makes sure that the input split is a ShuffleDependency. It then requests ShuffleManager for a ShuffleReader to read key-value pairs (as Iterator[(K, C)]) for the split.

A Partition has the index property to specify startPartition and endPartition partition offsets.

Getting Placement Preferences of Partition — getPreferredLocations Method

getPreferredLocations(partition: Partition): Seq[String]
getPreferredLocations is a part of RDD contract to specify placement preferences (aka preferred task locations), i.e. where tasks should be executed to be as close to the data as possible.

Internally, getPreferredLocations requests MapOutputTrackerMaster for the preferred locations, i.e. BlockManagers with the most map outputs, for the input partition (of the one and only ShuffleDependency).

getPreferredLocations uses SparkEnv to access the current MapOutputTrackerMaster (which runs on the driver).


ShuffledRDDPartition gets an index when it is created (that in turn is the index of partitions as calculated by the Partitioner of a ShuffledRDD).

results matching ""

    No results matching ""