ShuffleExchangeExec Unary Physical Operator

ShuffleExchangeExec is a Exchange unary physical operator to perform a shuffle.

ShuffleExchangeExec corresponds to Repartition (with shuffle enabled) and RepartitionByExpression logical operators (as resolved in BasicOperators execution planning strategy).

Note
ShuffleExchangeExec shows as Exchange in physical plans.
// Uses Repartition logical operator
// ShuffleExchangeExec with RoundRobinPartitioning
val q1 = spark.range(6).repartition(2)
scala> q1.explain
== Physical Plan ==
Exchange RoundRobinPartitioning(2)
+- *Range (0, 6, step=1, splits=Some(8))

// Uses RepartitionByExpression logical operator
// ShuffleExchangeExec with HashPartitioning
val q2 = spark.range(6).repartition(2, 'id % 2)
scala> q2.explain
== Physical Plan ==
Exchange hashpartitioning((id#38L % 2), 2)
+- *Range (0, 6, step=1, splits=Some(8))

When created, ShuffleExchangeExec takes a Partitioning, a single child physical operator and an optional ExchangeCoordinator.

Table 1. ShuffleExchangeExec’s Performance Metrics
Key Name (in web UI) Description

dataSize

data size

spark sql ShuffleExchangeExec webui.png
Figure 1. ShuffleExchangeExec in web UI (Details for Query)

nodeName is computed based on the optional ExchangeCoordinator with Exchange prefix and possibly (coordinator id: [coordinator-hash-code]).

outputPartitioning is the input Partitioning.

While preparing execution (using doPrepare), ShuffleExchangeExec registers itself with the ExchangeCoordinator if available.

When doExecute, ShuffleExchangeExec computes a ShuffledRowRDD and caches it (to reuse avoiding possibly expensive executions).

Table 2. ShuffleExchangeExec’s Internal Registries and Counters
Name Description

cachedShuffleRDD

ShuffledRowRDD that is cached after ShuffleExchangeExec has been executed.

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

doExecute(): RDD[InternalRow]
Note
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

doExecute creates a new ShuffledRowRDD or takes cached one.

doExecute branches off per optional ExchangeCoordinator.

If ExchangeCoordinator was specified, doExecute requests ExchangeCoordinator for a ShuffledRowRDD.

Otherwise (with no ExchangeCoordinator specified), doExecute prepareShuffleDependency and preparePostShuffleRDD.

preparePostShuffleRDD Method

Caution
FIXME

prepareShuffleDependency Internal Method

prepareShuffleDependency(): ShuffleDependency[Int, InternalRow, InternalRow]
Caution
FIXME

prepareShuffleDependency Helper Method

prepareShuffleDependency(
  rdd: RDD[InternalRow],
  outputAttributes: Seq[Attribute],
  newPartitioning: Partitioning,
  serializer: Serializer): ShuffleDependency[Int, InternalRow, InternalRow]

prepareShuffleDependency creates a ShuffleDependency dependency.

Note
prepareShuffleDependency is used when ShuffleExchangeExec prepares a ShuffleDependency (as part of…​FIXME), CollectLimitExec and TakeOrderedAndProjectExec physical operators are executed.

results matching ""

    No results matching ""