// Uses Repartition logical operator
// ShuffleExchangeExec with RoundRobinPartitioning
val q1 = spark.range(6).repartition(2)
scala> q1.explain
== Physical Plan ==
Exchange RoundRobinPartitioning(2)
+- *Range (0, 6, step=1, splits=Some(8))
// Uses RepartitionByExpression logical operator
// ShuffleExchangeExec with HashPartitioning
val q2 = spark.range(6).repartition(2, 'id % 2)
scala> q2.explain
== Physical Plan ==
Exchange hashpartitioning((id#38L % 2), 2)
+- *Range (0, 6, step=1, splits=Some(8))
ShuffleExchangeExec Unary Physical Operator
ShuffleExchangeExec
is an Exchange unary physical operator that is used to perform a shuffle.
ShuffleExchangeExec
is created (possibly indirectly using apply factory) when:
-
BasicOperators execution planning strategy is executed and plans Repartition (with
shuffle
flag enabled) and RepartitionByExpression logical operators -
EnsureRequirements physical query optimization is executed (and requested to enforce partition requirements)
Note
|
ShuffleExchangeExec presents itself as Exchange in physical query plans.
|
When requested for nodeName, ShuffleExchangeExec
gives Exchange prefix possibly followed by (coordinator id: [coordinator-hash-code]) per the optional ExchangeCoordinator.
When requested for the output data partitioning requirements, ShuffleExchangeExec
simply returns the Partitioning.
When requested to prepare for execution, ShuffleExchangeExec
registers itself with the optional ExchangeCoordinator if defined.
Creating ShuffleExchangeExec Instance
ShuffleExchangeExec
takes the following to be created:
-
Child [physical operator]
-
Optional [ExchangeCoordinator]
The optional ExchangeCoordinator is defined only for Adaptive Query Execution (when EnsureRequirements physical query optimization is executed).
Performance Metrics — metrics
Method
Key | Name (in web UI) | Description |
---|---|---|
|
data size |
Executing Physical Operator (Generating RDD[InternalRow]) — doExecute
Method
doExecute(): RDD[InternalRow]
Note
|
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow] ).
|
doExecute
creates a new ShuffledRowRDD or (re)uses the cached one if doExecute
was executed before.
Note
|
ShuffleExchangeExec caches a ShuffledRowRDD for later reuse.
|
doExecute
branches off per the optional ExchangeCoordinator.
Note
|
The optional ExchangeCoordinator is available only when Adaptive Query Execution is enabled (and EnsureRequirements physical query optimization is requested to enforce partition requirements (distribution and ordering) of a physical operator).
|
With an ExchangeCoordinator
available, doExecute
requests the ExchangeCoordinator for a ShuffledRowRDD.
Otherwise (with no ExchangeCoordinator
available), doExecute
prepares a ShuffleDependency and then creates a ShuffledRowRDD.
In the end, doExecute
saves (caches) the result ShuffledRowRDD
(as the cachedShuffleRDD internal registry).
preparePostShuffleRDD
Method
preparePostShuffleRDD(
shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow],
specifiedPartitionStartIndices: Option[Array[Int]] = None): ShuffledRowRDD
preparePostShuffleRDD
…FIXME
Note
|
|
Preparing ShuffleDependency — prepareShuffleDependency
Internal Method
prepareShuffleDependency(): ShuffleDependency[Int, InternalRow, InternalRow] (1)
prepareShuffleDependency(
rdd: RDD[InternalRow],
outputAttributes: Seq[Attribute],
newPartitioning: Partitioning,
serializer: Serializer): ShuffleDependency[Int, InternalRow, InternalRow]
-
Uses the child operator (for the
rdd
andoutputAttributes
) and the serializer
prepareShuffleDependency
creates a Spark Core ShuffleDependency
with a RDD[Product2[Int, InternalRow]]
(where Ints
are partition IDs of the InternalRows
values) and the given Serializer
(e.g. the Serializer of the ShuffleExchangeExec
physical operator).
Internally, prepareShuffleDependency
…FIXME
Note
|
|
prepareShuffleDependency
Helper Method
prepareShuffleDependency(
rdd: RDD[InternalRow],
outputAttributes: Seq[Attribute],
newPartitioning: Partitioning,
serializer: Serializer): ShuffleDependency[Int, InternalRow, InternalRow]
prepareShuffleDependency
creates a ShuffleDependency dependency.
Note
|
prepareShuffleDependency is used when ShuffleExchangeExec prepares a ShuffleDependency (as part of…FIXME), CollectLimitExec and TakeOrderedAndProjectExec physical operators are executed.
|
Preparing Physical Operator for Execution — doPrepare
Method
doPrepare(): Unit
Note
|
doPrepare is part of SparkPlan Contract to prepare a physical operator for execution.
|
doPrepare
simply requests the ExchangeCoordinator to register the ShuffleExchangeExec unary physical operator.
Creating ShuffleExchangeExec Without ExchangeCoordinator — apply
Utility
apply(
newPartitioning: Partitioning,
child: SparkPlan): ShuffleExchangeExec
apply
creates a new ShuffleExchangeExec physical operator with an empty ExchangeCoordinator.
Note
|
|
Internal Properties
Name | Description |
---|---|
|
ShuffledRowRDD that is created when |
|
Used exclusively in prepareShuffleDependency to create a |