ShuffleExchangeExec Unary Physical Operator

ShuffleExchangeExec is an Exchange unary physical operator that is used to perform a shuffle.

ShuffleExchangeExec is created (possibly indirectly using apply factory) when:

Note
ShuffleExchangeExec presents itself as Exchange in physical query plans.
// Uses Repartition logical operator
// ShuffleExchangeExec with RoundRobinPartitioning
val q1 = spark.range(6).repartition(2)
scala> q1.explain
== Physical Plan ==
Exchange RoundRobinPartitioning(2)
+- *Range (0, 6, step=1, splits=Some(8))

// Uses RepartitionByExpression logical operator
// ShuffleExchangeExec with HashPartitioning
val q2 = spark.range(6).repartition(2, 'id % 2)
scala> q2.explain
== Physical Plan ==
Exchange hashpartitioning((id#38L % 2), 2)
+- *Range (0, 6, step=1, splits=Some(8))

When requested for nodeName, ShuffleExchangeExec gives Exchange prefix possibly followed by (coordinator id: [coordinator-hash-code]) per the optional ExchangeCoordinator.

When requested for the output data partitioning requirements, ShuffleExchangeExec simply returns the Partitioning.

When requested to prepare for execution, ShuffleExchangeExec registers itself with the optional ExchangeCoordinator if defined.

Creating ShuffleExchangeExec Instance

ShuffleExchangeExec takes the following to be created:

The optional ExchangeCoordinator is defined only for Adaptive Query Execution (when EnsureRequirements physical query optimization is executed).

Performance Metrics — metrics Method

Table 1. ShuffleExchangeExec’s Performance Metrics
Key Name (in web UI) Description

dataSize

data size

spark sql ShuffleExchangeExec webui.png
Figure 1. ShuffleExchangeExec in web UI (Details for Query)

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

doExecute(): RDD[InternalRow]
Note
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

doExecute creates a new ShuffledRowRDD or (re)uses the cached one if doExecute was executed before.

Note
ShuffleExchangeExec caches a ShuffledRowRDD for later reuse.

doExecute branches off per the optional ExchangeCoordinator.

Note
The optional ExchangeCoordinator is available only when Adaptive Query Execution is enabled (and EnsureRequirements physical query optimization is requested to enforce partition requirements (distribution and ordering) of a physical operator).

With an ExchangeCoordinator available, doExecute requests the ExchangeCoordinator for a ShuffledRowRDD.

Otherwise (with no ExchangeCoordinator available), doExecute prepares a ShuffleDependency and then creates a ShuffledRowRDD.

In the end, doExecute saves (caches) the result ShuffledRowRDD (as the cachedShuffleRDD internal registry).

preparePostShuffleRDD Method

preparePostShuffleRDD(
  shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow],
  specifiedPartitionStartIndices: Option[Array[Int]] = None): ShuffledRowRDD

preparePostShuffleRDD…​FIXME

Note

preparePostShuffleRDD is used when:

Preparing ShuffleDependency — prepareShuffleDependency Internal Method

prepareShuffleDependency(): ShuffleDependency[Int, InternalRow, InternalRow] (1)
prepareShuffleDependency(
  rdd: RDD[InternalRow],
  outputAttributes: Seq[Attribute],
  newPartitioning: Partitioning,
  serializer: Serializer): ShuffleDependency[Int, InternalRow, InternalRow]
  1. Uses the child operator (for the rdd and outputAttributes) and the serializer

prepareShuffleDependency creates a Spark Core ShuffleDependency with a RDD[Product2[Int, InternalRow]] (where Ints are partition IDs of the InternalRows values) and the given Serializer (e.g. the Serializer of the ShuffleExchangeExec physical operator).

Internally, prepareShuffleDependency…​FIXME

Note

prepareShuffleDependency is used when:

prepareShuffleDependency Helper Method

prepareShuffleDependency(
  rdd: RDD[InternalRow],
  outputAttributes: Seq[Attribute],
  newPartitioning: Partitioning,
  serializer: Serializer): ShuffleDependency[Int, InternalRow, InternalRow]

prepareShuffleDependency creates a ShuffleDependency dependency.

Note
prepareShuffleDependency is used when ShuffleExchangeExec prepares a ShuffleDependency (as part of…​FIXME), CollectLimitExec and TakeOrderedAndProjectExec physical operators are executed.

Preparing Physical Operator for Execution — doPrepare Method

doPrepare(): Unit
Note
doPrepare is part of SparkPlan Contract to prepare a physical operator for execution.

Creating ShuffleExchangeExec Without ExchangeCoordinator — apply Utility

apply(
  newPartitioning: Partitioning,
  child: SparkPlan): ShuffleExchangeExec

apply creates a new ShuffleExchangeExec physical operator with an empty ExchangeCoordinator.

Note

apply is used when:

Internal Properties

Name Description

cachedShuffleRDD

ShuffledRowRDD that is created when ShuffleExchangeExec operator is executed (to generate RDD[InternalRow]) and reused (cached) if the operator is used by multiple plans

serializer

UnsafeRowSerializer (of the size as the number of the output schema attributes of the child physical operator and the dataSize performance metric)

Used exclusively in prepareShuffleDependency to create a ShuffleDependency

results matching ""

    No results matching ""