ShuffleExchangeExec Unary Physical Operator

ShuffleExchangeExec is a concrete Exchange unary physical operator that is used to perform a shuffle.

ShuffleExchangeExec corresponds to Repartition (with shuffle enabled) and RepartitionByExpression logical operators at execution time (as resolved by BasicOperators execution planning strategy).

Note
ShuffleExchangeExec presents itself as Exchange in physical query plans.
// Uses Repartition logical operator
// ShuffleExchangeExec with RoundRobinPartitioning
val q1 = spark.range(6).repartition(2)
scala> q1.explain
== Physical Plan ==
Exchange RoundRobinPartitioning(2)
+- *Range (0, 6, step=1, splits=Some(8))

// Uses RepartitionByExpression logical operator
// ShuffleExchangeExec with HashPartitioning
val q2 = spark.range(6).repartition(2, 'id % 2)
scala> q2.explain
== Physical Plan ==
Exchange hashpartitioning((id#38L % 2), 2)
+- *Range (0, 6, step=1, splits=Some(8))

ShuffleExchangeExec takes the following to be created:

The optional ExchangeCoordinator is defined only for Adaptive Query Execution (when EnsureRequirements physical query optimization is executed).

When requested for nodeName, ShuffleExchangeExec gives Exchange prefix possibly followed by (coordinator id: [coordinator-hash-code]) per the optional ExchangeCoordinator.

When requested for the output data partitioning requirements, ShuffleExchangeExec simply returns the Partitioning.

When requested to prepare for execution, ShuffleExchangeExec registers itself with the optional ExchangeCoordinator if defined.

Performance Metrics

Table 1. ShuffleExchangeExec’s Performance Metrics
Key Name (in web UI) Description

dataSize

data size

spark sql ShuffleExchangeExec webui.png
Figure 1. ShuffleExchangeExec in web UI (Details for Query)

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

doExecute(): RDD[InternalRow]
Note
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

doExecute creates a new ShuffledRowRDD or (re)uses the cached one if doExecute was executed before.

Note
ShuffleExchangeExec caches a ShuffledRowRDD for later reuse.

doExecute branches off per the optional ExchangeCoordinator.

Note
The optional ExchangeCoordinator is available only when Adaptive Query Execution is enabled (and EnsureRequirements physical query optimization is requested to enforce partition requirements (distribution and ordering) of a physical operator).

If ExchangeCoordinator was available, doExecute requests the ExchangeCoordinator for a ShuffledRowRDD.

Otherwise (when no ExchangeCoordinator is available), doExecute prepareShuffleDependency and preparePostShuffleRDD.

In the end, doExecute saves (caches) the result ShuffledRowRDD (as cachedShuffleRDD internal registry).

preparePostShuffleRDD Method

preparePostShuffleRDD(
  shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow],
  specifiedPartitionStartIndices: Option[Array[Int]] = None): ShuffledRowRDD

preparePostShuffleRDD…​FIXME

Note

preparePostShuffleRDD is used when:

prepareShuffleDependency Internal Method

prepareShuffleDependency(): ShuffleDependency[Int, InternalRow, InternalRow]

prepareShuffleDependency…​FIXME

Note

prepareShuffleDependency is used when:

prepareShuffleDependency Helper Method

prepareShuffleDependency(
  rdd: RDD[InternalRow],
  outputAttributes: Seq[Attribute],
  newPartitioning: Partitioning,
  serializer: Serializer): ShuffleDependency[Int, InternalRow, InternalRow]

prepareShuffleDependency creates a ShuffleDependency dependency.

Note
prepareShuffleDependency is used when ShuffleExchangeExec prepares a ShuffleDependency (as part of…​FIXME), CollectLimitExec and TakeOrderedAndProjectExec physical operators are executed.

Preparing Physical Operator for Execution — doPrepare Method

doPrepare(): Unit
Note
doPrepare is part of SparkPlan Contract to prepare a physical operator for execution.

Internal Properties

Table 2. ShuffleExchangeExec’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

cachedShuffleRDD

ShuffledRowRDD that is created when ShuffleExchangeExec operator is executed (to generate RDD[InternalRow]) and reused (cached) if the operator is used by multiple plans

serializer

UnsafeRowSerializer (of the size as the number of the output schema attributes of the child physical operator and the dataSize performance metric)

Used exclusively in prepareShuffleDependency to create a ShuffleDependency

results matching ""

    No results matching ""