SortShuffleManager — The Default (And Only) Sort-Based Shuffle System

SortShuffleManager is the one and only ShuffleManager in Spark with the short name sort or tungsten-sort.

Note
You can use spark.shuffle.manager Spark property to activate your own implementation of ShuffleManager contract.
Caution
FIXME The internal registries
Table 1. SortShuffleManager’s Internal Registries and Counters
Name Description

numMapsForShuffle

shuffleBlockResolver

IndexShuffleBlockResolver created when SortShuffleManager is created and used throughout the lifetime of the owning SortShuffleManager.

NOTE: shuffleBlockResolver is part of ShuffleManager contract.

Beside the uses due to the contract, shuffleBlockResolver is used in unregisterShuffle and stopped in stop.

Tip

Enable DEBUG logging level for org.apache.spark.shuffle.sort.SortShuffleManager$ logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.shuffle.sort.SortShuffleManager$=DEBUG

Refer to Logging.

unregisterShuffle Method

Caution
FIXME

Creating SortShuffleManager Instance

SortShuffleManager takes a SparkConf.

SortShuffleManager makes sure that spark.shuffle.spill Spark property is enabled. If not you should see the following WARN message in the logs:

WARN SortShuffleManager: spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+. Shuffle will continue to spill to disk when necessary.

SortShuffleManager initializes the internal registries and counters.

Note
SortShuffleManager is created when SparkEnv is created (on the driver and executors) which is at the very beginning of a Spark application’s lifecycle.

Creating ShuffleHandle (For ShuffleDependency) — registerShuffle Method

registerShuffle[K, V, C](
  shuffleId: Int,
  numMaps: Int,
  dependency: ShuffleDependency[K, V, C]): ShuffleHandle
Note
registerShuffle is part of ShuffleManager contract.
Caution
FIXME Copy the conditions

registerShuffle returns a new ShuffleHandle that can be one of the following:

Selecting ShuffleWriter For ShuffleHandle — getWriter Method

getWriter[K, V](
  handle: ShuffleHandle,
  mapId: Int,
  context: TaskContext): ShuffleWriter[K, V]
Note
getWriter is part of ShuffleManager contract.

Internally, getWriter makes sure that a ShuffleHandle is associated with its numMaps in numMapsForShuffle internal registry.

Caution
FIXME Associated?! What’s that?
Note
getWriter expects that the input handle is of type BaseShuffleHandle (despite the signature that says that it can work with any ShuffleHandle). Moreover, getWriter further expects that in 2 (out of 3 cases) the input handle is a more specialized IndexShuffleBlockResolver.

getWriter then returns a new ShuffleWriter for the input ShuffleHandle:

Creating BlockStoreShuffleReader For ShuffleHandle And Reduce Partitions — getReader Method

getReader[K, C](
  handle: ShuffleHandle,
  startPartition: Int,
  endPartition: Int,
  context: TaskContext): ShuffleReader[K, C]
Note
getReader is part of ShuffleManager Contract.

getReader returns a new BlockStoreShuffleReader passing all the input parameters on to it.

Note
getReader assumes that the input ShuffleHandle is of type BaseShuffleHandle.

Stopping SortShuffleManager — stop Method

stop(): Unit
Note
stop is part of ShuffleManager contract.

stop stops IndexShuffleBlockResolver (available as shuffleBlockResolver internal reference).

Considering BypassMergeSortShuffleHandle for ShuffleHandle — shouldBypassMergeSort Method

shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean

shouldBypassMergeSort holds (i.e. is positive) when:

  1. The input ShuffleDependency has mapSideCombine flag enabled and aggregator defined.

  2. mapSideCombine flag is disabled (i.e. false) but the number of partitions (of the Partitioner of the input ShuffleDependency) is at most spark.shuffle.sort.bypassMergeThreshold Spark property (which defaults to 200).

Otherwise, shouldBypassMergeSort does not hold (i.e. false).

Note
shouldBypassMergeSort is exclusively used when SortShuffleManager selects a ShuffleHandle (for a ShuffleDependency).

Considering SerializedShuffleHandle for ShuffleHandle — canUseSerializedShuffle Method

canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean

canUseSerializedShuffle condition holds (i.e. is positive) when all of the following hold (checked in that order):

You should see the following DEBUG message in the logs when canUseSerializedShuffle holds:

DEBUG Can use serialized shuffle for shuffle [id]

Otherwise, canUseSerializedShuffle does not hold and you should see one of the following DEBUG messages:

DEBUG Can't use serialized shuffle for shuffle [id] because the serializer, [name], does not support object relocation

DEBUG SortShuffleManager: Can't use serialized shuffle for shuffle [id] because an aggregator is defined

DEBUG Can't use serialized shuffle for shuffle [id] because it has more than [number] partitions
Note
canUseSerializedShuffle is exclusively used when SortShuffleManager selects a ShuffleHandle (for a ShuffleDependency).

Settings

Table 2. Spark Properties
Spark Property Default Value Description

spark.shuffle.sort.bypassMergeThreshold

200

The maximum number of reduce partitions below which SortShuffleManager avoids merge-sorting data if there is no map-side aggregation either.

spark.shuffle.spill

true

No longer in use.

When false the following WARN shows in the logs when SortShuffleManager is created:

WARN SortShuffleManager: spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+. Shuffle will continue to spill to disk when necessary.

results matching ""

    No results matching ""