SortShuffleManager — The Default (And Only) Sort-Based Shuffle System

SortShuffleManager is the default and only ShuffleManager in Spark with the short name sort or tungsten-sort.

You can use spark.shuffle.manager Spark property to activate your own implementation of ShuffleManager contract.

SortShuffleManager is created when SparkEnv is created (on the driver and executors at the very beginning of a Spark application’s lifecycle).

SortShuffleManager allows for (1 << 24) partition identifiers that can be encoded (i.e. 16777216).


Enable ALL logging level for org.apache.spark.shuffle.sort.SortShuffleManager$ logger to see what happens inside.

Add the following line to conf/$=ALL

Refer to Logging.

Unregistering Shuffle — unregisterShuffle Method

unregisterShuffle(shuffleId: Int): Boolean
unregisterShuffle is part of the ShuffleManager Contract to unregister a shuffle.


Creating SortShuffleManager Instance

SortShuffleManager takes a SparkConf.

SortShuffleManager makes sure that spark.shuffle.spill Spark property is enabled. If not you should see the following WARN message in the logs:

WARN SortShuffleManager: spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+. Shuffle will continue to spill to disk when necessary.

SortShuffleManager initializes the internal registries and counters.

Creating ShuffleHandle (For ShuffleDependency) — registerShuffle Method

registerShuffle[K, V, C](
  shuffleId: Int,
  numMaps: Int,
  dependency: ShuffleDependency[K, V, C]): ShuffleHandle
registerShuffle is part of ShuffleManager contract.
FIXME Copy the conditions

registerShuffle returns a new ShuffleHandle that can be one of the following:

Getting ShuffleWriter For ShuffleHandle — getWriter Method

getWriter[K, V](
  handle: ShuffleHandle,
  mapId: Int,
  context: TaskContext): ShuffleWriter[K, V]
getWriter is part of ShuffleManager contract.

Internally, getWriter makes sure that a ShuffleHandle is associated with its numMaps in numMapsForShuffle internal registry.

FIXME Associated?! What’s that?
getWriter expects that the input handle is of type BaseShuffleHandle (despite the signature that says that it can work with any ShuffleHandle). Moreover, getWriter further expects that in 2 (out of 3 cases) the input handle is a more specialized IndexShuffleBlockResolver.

getWriter then returns a new ShuffleWriter for the input ShuffleHandle:

Creating BlockStoreShuffleReader For ShuffleHandle And Reduce Partitions — getReader Method

getReader[K, C](
  handle: ShuffleHandle,
  startPartition: Int,
  endPartition: Int,
  context: TaskContext): ShuffleReader[K, C]
getReader is part of ShuffleManager Contract.

getReader returns a new BlockStoreShuffleReader passing all the input parameters on to it.

getReader assumes that the input ShuffleHandle is of type BaseShuffleHandle.

Stopping SortShuffleManager — stop Method

stop(): Unit
stop is part of ShuffleManager contract to stop the shuffle system.

stop stops IndexShuffleBlockResolver (available as shuffleBlockResolver internal reference).

Considering BypassMergeSortShuffleHandle for ShuffleHandle — shouldBypassMergeSort Method

shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean

shouldBypassMergeSort holds (i.e. is positive) when:

  1. The input ShuffleDependency has mapSideCombine flag enabled and aggregator defined.

  2. mapSideCombine flag is disabled (i.e. false) but the number of partitions (of the Partitioner of the input ShuffleDependency) is at most spark.shuffle.sort.bypassMergeThreshold Spark property (which defaults to 200).

Otherwise, shouldBypassMergeSort does not hold (i.e. false).

shouldBypassMergeSort is exclusively used when SortShuffleManager selects a ShuffleHandle (for a ShuffleDependency).

Considering SerializedShuffleHandle for ShuffleHandle — canUseSerializedShuffle Method

canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean

canUseSerializedShuffle condition holds (i.e. is positive) when all of the following hold (checked in that order):

You should see the following DEBUG message in the logs when canUseSerializedShuffle holds:

DEBUG Can use serialized shuffle for shuffle [id]

Otherwise, canUseSerializedShuffle does not hold and you should see one of the following DEBUG messages:

DEBUG Can't use serialized shuffle for shuffle [id] because the serializer, [name], does not support object relocation

DEBUG SortShuffleManager: Can't use serialized shuffle for shuffle [id] because an aggregator is defined

DEBUG Can't use serialized shuffle for shuffle [id] because it has more than [number] partitions
canUseSerializedShuffle is exclusively used when SortShuffleManager selects a ShuffleHandle (for a ShuffleDependency).


Table 1. Spark Properties
Spark Property Default Value Description



The maximum number of reduce partitions below which SortShuffleManager avoids merge-sorting data if there is no map-side aggregation either.



No longer in use.

When false the following WARN shows in the logs when SortShuffleManager is created:

WARN SortShuffleManager: spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+. Shuffle will continue to spill to disk when necessary.

Internal Properties

Table 2. SortShuffleManager’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description



IndexShuffleBlockResolver created when SortShuffleManager is created and used throughout the lifetime of the owning SortShuffleManager.

shuffleBlockResolver is part of ShuffleManager contract.

Beside the uses due to the contract, shuffleBlockResolver is used in unregisterShuffle and stopped in stop.

results matching ""

    No results matching ""