// Uses Repartition logical operator
// ShuffleExchangeExec with RoundRobinPartitioning
val q1 = spark.range(6).repartition(2)
scala> q1.explain
== Physical Plan ==
Exchange RoundRobinPartitioning(2)
+- *Range (0, 6, step=1, splits=Some(8))
// Uses RepartitionByExpression logical operator
// ShuffleExchangeExec with HashPartitioning
val q2 = spark.range(6).repartition(2, 'id % 2)
scala> q2.explain
== Physical Plan ==
Exchange hashpartitioning((id#38L % 2), 2)
+- *Range (0, 6, step=1, splits=Some(8))
ShuffleExchangeExec Unary Physical Operator
ShuffleExchangeExec is an Exchange unary physical operator that is used to perform a shuffle.
ShuffleExchangeExec is created (possibly indirectly using apply factory) when:
-
BasicOperators execution planning strategy is executed and plans Repartition (with
shuffleflag enabled) and RepartitionByExpression logical operators -
EnsureRequirements physical query optimization is executed (and requested to enforce partition requirements)
|
Note
|
ShuffleExchangeExec presents itself as Exchange in physical query plans.
|
When requested for nodeName, ShuffleExchangeExec gives Exchange prefix possibly followed by (coordinator id: [coordinator-hash-code]) per the optional ExchangeCoordinator.
When requested for the output data partitioning requirements, ShuffleExchangeExec simply returns the Partitioning.
When requested to prepare for execution, ShuffleExchangeExec registers itself with the optional ExchangeCoordinator if defined.
Creating ShuffleExchangeExec Instance
ShuffleExchangeExec takes the following to be created:
-
Child [physical operator]
-
Optional [ExchangeCoordinator]
The optional ExchangeCoordinator is defined only for Adaptive Query Execution (when EnsureRequirements physical query optimization is executed).
Performance Metrics — metrics Method
| Key | Name (in web UI) | Description |
|---|---|---|
|
data size |
Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method
doExecute(): RDD[InternalRow]
|
Note
|
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).
|
doExecute creates a new ShuffledRowRDD or (re)uses the cached one if doExecute was executed before.
|
Note
|
ShuffleExchangeExec caches a ShuffledRowRDD for later reuse.
|
doExecute branches off per the optional ExchangeCoordinator.
|
Note
|
The optional ExchangeCoordinator is available only when Adaptive Query Execution is enabled (and EnsureRequirements physical query optimization is requested to enforce partition requirements (distribution and ordering) of a physical operator).
|
With an ExchangeCoordinator available, doExecute requests the ExchangeCoordinator for a ShuffledRowRDD.
Otherwise (with no ExchangeCoordinator available), doExecute prepares a ShuffleDependency and then creates a ShuffledRowRDD.
In the end, doExecute saves (caches) the result ShuffledRowRDD (as the cachedShuffleRDD internal registry).
preparePostShuffleRDD Method
preparePostShuffleRDD(
shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow],
specifiedPartitionStartIndices: Option[Array[Int]] = None): ShuffledRowRDD
preparePostShuffleRDD…FIXME
|
Note
|
|
Preparing ShuffleDependency — prepareShuffleDependency Internal Method
prepareShuffleDependency(): ShuffleDependency[Int, InternalRow, InternalRow] (1)
prepareShuffleDependency(
rdd: RDD[InternalRow],
outputAttributes: Seq[Attribute],
newPartitioning: Partitioning,
serializer: Serializer): ShuffleDependency[Int, InternalRow, InternalRow]
-
Uses the child operator (for the
rddandoutputAttributes) and the serializer
prepareShuffleDependency creates a Spark Core ShuffleDependency with a RDD[Product2[Int, InternalRow]] (where Ints are partition IDs of the InternalRows values) and the given Serializer (e.g. the Serializer of the ShuffleExchangeExec physical operator).
Internally, prepareShuffleDependency…FIXME
|
Note
|
|
prepareShuffleDependency Helper Method
prepareShuffleDependency(
rdd: RDD[InternalRow],
outputAttributes: Seq[Attribute],
newPartitioning: Partitioning,
serializer: Serializer): ShuffleDependency[Int, InternalRow, InternalRow]
prepareShuffleDependency creates a ShuffleDependency dependency.
|
Note
|
prepareShuffleDependency is used when ShuffleExchangeExec prepares a ShuffleDependency (as part of…FIXME), CollectLimitExec and TakeOrderedAndProjectExec physical operators are executed.
|
Preparing Physical Operator for Execution — doPrepare Method
doPrepare(): Unit
|
Note
|
doPrepare is part of SparkPlan Contract to prepare a physical operator for execution.
|
doPrepare simply requests the ExchangeCoordinator to register the ShuffleExchangeExec unary physical operator.
Creating ShuffleExchangeExec Without ExchangeCoordinator — apply Utility
apply(
newPartitioning: Partitioning,
child: SparkPlan): ShuffleExchangeExec
apply creates a new ShuffleExchangeExec physical operator with an empty ExchangeCoordinator.
|
Note
|
|
Internal Properties
| Name | Description |
|---|---|
|
ShuffledRowRDD that is created when |
|
Used exclusively in prepareShuffleDependency to create a |