ShuffleExchangeExec Unary Physical Operator

ShuffleExchangeExec is an Exchange unary physical operator that is used to perform a shuffle.

ShuffleExchangeExec is created (possibly indirectly using apply factory) when:

ShuffleExchangeExec presents itself as Exchange in physical query plans.
// Uses Repartition logical operator
// ShuffleExchangeExec with RoundRobinPartitioning
val q1 = spark.range(6).repartition(2)
scala> q1.explain
== Physical Plan ==
Exchange RoundRobinPartitioning(2)
+- *Range (0, 6, step=1, splits=Some(8))

// Uses RepartitionByExpression logical operator
// ShuffleExchangeExec with HashPartitioning
val q2 = spark.range(6).repartition(2, 'id % 2)
scala> q2.explain
== Physical Plan ==
Exchange hashpartitioning((id#38L % 2), 2)
+- *Range (0, 6, step=1, splits=Some(8))

When requested for nodeName, ShuffleExchangeExec gives Exchange prefix possibly followed by (coordinator id: [coordinator-hash-code]) per the optional ExchangeCoordinator.

When requested for the output data partitioning requirements, ShuffleExchangeExec simply returns the Partitioning.

When requested to prepare for execution, ShuffleExchangeExec registers itself with the optional ExchangeCoordinator if defined.

Creating ShuffleExchangeExec Instance

ShuffleExchangeExec takes the following to be created:

The optional ExchangeCoordinator is defined only for Adaptive Query Execution (when EnsureRequirements physical query optimization is executed).

Performance Metrics — metrics Method

Table 1. ShuffleExchangeExec’s Performance Metrics
Key Name (in web UI) Description


data size

spark sql ShuffleExchangeExec webui.png
Figure 1. ShuffleExchangeExec in web UI (Details for Query)

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

doExecute(): RDD[InternalRow]
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

doExecute creates a new ShuffledRowRDD or (re)uses the cached one if doExecute was executed before.

ShuffleExchangeExec caches a ShuffledRowRDD for later reuse.

doExecute branches off per the optional ExchangeCoordinator.

The optional ExchangeCoordinator is available only when Adaptive Query Execution is enabled (and EnsureRequirements physical query optimization is requested to enforce partition requirements (distribution and ordering) of a physical operator).

With an ExchangeCoordinator available, doExecute requests the ExchangeCoordinator for a ShuffledRowRDD.

Otherwise (with no ExchangeCoordinator available), doExecute prepares a ShuffleDependency and then creates a ShuffledRowRDD.

In the end, doExecute saves (caches) the result ShuffledRowRDD (as the cachedShuffleRDD internal registry).

preparePostShuffleRDD Method

  shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow],
  specifiedPartitionStartIndices: Option[Array[Int]] = None): ShuffledRowRDD



preparePostShuffleRDD is used when:

Preparing ShuffleDependency — prepareShuffleDependency Internal Method

prepareShuffleDependency(): ShuffleDependency[Int, InternalRow, InternalRow] (1)
  rdd: RDD[InternalRow],
  outputAttributes: Seq[Attribute],
  newPartitioning: Partitioning,
  serializer: Serializer): ShuffleDependency[Int, InternalRow, InternalRow]
  1. Uses the child operator (for the rdd and outputAttributes) and the serializer

prepareShuffleDependency creates a Spark Core ShuffleDependency with a RDD[Product2[Int, InternalRow]] (where Ints are partition IDs of the InternalRows values) and the given Serializer (e.g. the Serializer of the ShuffleExchangeExec physical operator).

Internally, prepareShuffleDependency…​FIXME


prepareShuffleDependency is used when:

prepareShuffleDependency Helper Method

  rdd: RDD[InternalRow],
  outputAttributes: Seq[Attribute],
  newPartitioning: Partitioning,
  serializer: Serializer): ShuffleDependency[Int, InternalRow, InternalRow]

prepareShuffleDependency creates a ShuffleDependency dependency.

prepareShuffleDependency is used when ShuffleExchangeExec prepares a ShuffleDependency (as part of…​FIXME), CollectLimitExec and TakeOrderedAndProjectExec physical operators are executed.

Preparing Physical Operator for Execution — doPrepare Method

doPrepare(): Unit
doPrepare is part of SparkPlan Contract to prepare a physical operator for execution.

Creating ShuffleExchangeExec Without ExchangeCoordinator — apply Utility

  newPartitioning: Partitioning,
  child: SparkPlan): ShuffleExchangeExec

apply creates a new ShuffleExchangeExec physical operator with an empty ExchangeCoordinator.


apply is used when:

Internal Properties

Name Description


ShuffledRowRDD that is created when ShuffleExchangeExec operator is executed (to generate RDD[InternalRow]) and reused (cached) if the operator is used by multiple plans


UnsafeRowSerializer (of the size as the number of the output schema attributes of the child physical operator and the dataSize performance metric)

Used exclusively in prepareShuffleDependency to create a ShuffleDependency

results matching ""

    No results matching ""