ShuffleManager — Pluggable Shuffle Systems

ShuffleManager is the pluggable mechanism for shuffle systems that track shuffle dependencies for ShuffleMapStage on the driver and executors.

Note
SortShuffleManager (short name: sort or tungsten-sort) is the one and only ShuffleManager in Spark 2.0.

spark.shuffle.manager Spark property sets up the default shuffle manager.

The driver and executor access their ShuffleManager instances using SparkEnv.

val shuffleManager = SparkEnv.get.shuffleManager

The driver registers shuffles with a shuffle manager, and executors (or tasks running locally in the driver) can ask to read and write data.

It is network-addressable, i.e. it is available on a host and port.

There can be many shuffle services running simultaneously and a driver registers with all of them when CoarseGrainedSchedulerBackend is used.

ShuffleManager Contract

trait ShuffleManager {
  def registerShuffle[K, V, C](
      shuffleId: Int,
      numMaps: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle
  def getWriter[K, V](
    handle: ShuffleHandle,
    mapId: Int,
    context: TaskContext): ShuffleWriter[K, V]
  def getReader[K, C](
    handle: ShuffleHandle,
    startPartition: Int,
    endPartition: Int,
    context: TaskContext): ShuffleReader[K, C]
  def unregisterShuffle(shuffleId: Int): Boolean
  def shuffleBlockResolver: ShuffleBlockResolver
  def stop(): Unit
}
Note
ShuffleManager is a private[spark] contract.
Table 1. ShuffleManager Contract
Method Description

registerShuffle

Executed when ShuffleDependency is created and registers itself.

getWriter

Used when a ShuffleMapTask runs (and requests a ShuffleWriter to write records for a partition).

getReader

Returns a ShuffleReader for a range of reduce partitions (to read key-value records for a ShuffleDependency dependency).

Used when:

  • CoGroupedRDD, ShuffledRDD, and SubtractedRDD are requested to compute a partition (for a ShuffleDependency dependency)

  • Spark SQL’s ShuffledRowRDD is requested to compute a partition

unregisterShuffle

Executed when ??? removes the metadata of a shuffle.

shuffleBlockResolver

Used when:

1. BlockManager requests a ShuffleBlockResolver capable of retrieving shuffle block data (for a ShuffleBlockId)

2. BlockManager requests a ShuffleBlockResolver for local shuffle block data as bytes.

stop

Used when SparkEnv stops.

Settings

Table 2. Spark Properties
Spark Property Default Value Description

spark.shuffle.manager

sort

ShuffleManager for a Spark application.

You can use a short name or the fully-qualified class name of a custom implementation.

The predefined aliases are sort and tungsten-sort with org.apache.spark.shuffle.sort.SortShuffleManager being the one and only ShuffleManager.

Further Reading or Watching

  1. (slides) Spark shuffle introduction by Raymond Liu (aka colorant).

results matching ""

    No results matching ""