RDD shuffling

Read the official documentation about the topic Shuffle operations. It is still better than this page.

Shuffling is a process of redistributing data across partitions (aka repartitioning) that may or may not cause moving data across JVM processes or even over the wire (between executors on separate machines).

Shuffling is the process of data transfer between stages.

Avoid shuffling at all cost. Think about ways to leverage existing partitions. Leverage partial aggregation to reduce data transfer.

By default, shuffling doesn’t change the number of partitions, but their content.

  • Avoid groupByKey and use reduceByKey or combineByKey instead.

    • groupByKey shuffles all the data, which is slow.

    • reduceByKey shuffles only the results of sub-aggregations in each partition of the data.

Example - join

PairRDD offers join transformation that (quoting the official documentation):

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key.

Let’s have a look at an example and see how it works under the covers:

scala> val kv = (0 to 5) zip Stream.continually(5)
kv: scala.collection.immutable.IndexedSeq[(Int, Int)] = Vector((0,5), (1,5), (2,5), (3,5), (4,5), (5,5))

scala> val kw  = (0 to 5) zip Stream.continually(10)
kw: scala.collection.immutable.IndexedSeq[(Int, Int)] = Vector((0,10), (1,10), (2,10), (3,10), (4,10), (5,10))

scala> val kvR = sc.parallelize(kv)
kvR: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[3] at parallelize at <console>:26

scala> val kwR = sc.parallelize(kw)
kwR: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[4] at parallelize at <console>:26

scala> val joined = kvR join kwR
joined: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[10] at join at <console>:32

scala> joined.toDebugString
res7: String =
(8) MapPartitionsRDD[10] at join at <console>:32 []
 |  MapPartitionsRDD[9] at join at <console>:32 []
 |  CoGroupedRDD[8] at join at <console>:32 []
 +-(8) ParallelCollectionRDD[3] at parallelize at <console>:26 []
 +-(8) ParallelCollectionRDD[4] at parallelize at <console>:26 []

It doesn’t look good when there is an "angle" between "nodes" in an operation graph. It appears before the join operation so shuffle is expected.

Here is how the job of executing joined.count looks in Web UI.

spark shuffle join webui.png
Figure 1. Executing joined.count

The screenshot of Web UI shows 3 stages with two parallelize to Shuffle Write and count to Shuffle Read. It means shuffling has indeed happened.

FIXME Just learnt about sc.range(0, 5) as a shorter version of sc.parallelize(0 to 5)

join operation is one of the cogroup operations that uses defaultPartitioner, i.e. walks through the RDD lineage graph (sorted by the number of partitions decreasing) and picks the partitioner with positive number of output partitions. Otherwise, it checks spark.default.parallelism property and if defined picks HashPartitioner with the default parallelism of the SchedulerBackend.

join is almost CoGroupedRDD.mapValues.

FIXME the default parallelism of scheduler backend

results matching ""

    No results matching ""