val q = ??? // FIXME
val sparkPlan = q.queryExecution.sparkPlan
import org.apache.spark.sql.execution.exchange.EnsureRequirements
val plan = EnsureRequirements(spark.sessionState.conf).apply(sparkPlan)
EnsureRequirements Physical Query Optimization
EnsureRequirements is a physical query optimization (aka physical query preparation rule or simply preparation rule) that QueryExecution uses to optimize the physical plan of a structured query by transforming the following physical operators (up the plan tree):
-
Removes two adjacent ShuffleExchangeExec physical operators if the child partitioning scheme guarantees the parent’s partitioning
-
For other non-
ShuffleExchangeExecphysical operators, ensures partition distribution and ordering (possibly adding new physical operators, e.g. BroadcastExchangeExec and ShuffleExchangeExec for distribution or SortExec for sorting)
Technically, EnsureRequirements is just a Catalyst rule for transforming physical query plans, i.e. Rule[SparkPlan].
EnsureRequirements is part of preparations batch of physical query plan rules and is executed when QueryExecution is requested for the optimized physical query plan (i.e. in executedPlan phase of a query execution).
EnsureRequirements takes a SQLConf when created.
Enforcing Partition Requirements (Distribution and Ordering) of Physical Operator — ensureDistributionAndOrdering Internal Method
ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan
Internally, ensureDistributionAndOrdering takes the following from the input physical operator:
-
required partition requirements for the children
-
required sort ordering per the required partition requirements per child
-
child physical plans
|
Note
|
The number of requirements for partitions and their sort ordering has to match the number and the order of the child physical plans. |
ensureDistributionAndOrdering matches the operator’s required partition requirements of children (requiredChildDistributions) to the children’s output partitioning and (in that order):
-
If the child satisfies the requested distribution, the child is left unchanged
-
For BroadcastDistribution, the child becomes the child of BroadcastExchangeExec unary operator for broadcast hash joins
-
Any other pair of child and distribution leads to ShuffleExchangeExec unary physical operator (with proper partitioning for distribution and with spark.sql.shuffle.partitions number of partitions, i.e.
200by default)
|
Note
|
ShuffleExchangeExec can appear in the physical plan when the children’s output partitioning cannot satisfy the physical operator’s required child distribution. |
If the input operator has multiple children and specifies child output distributions, then the children’s output partitionings have to be compatible.
If the children’s output partitionings are not all compatible, then…FIXME
ensureDistributionAndOrdering adds ExchangeCoordinator (only when adaptive query execution is enabled which is not by default).
|
Note
|
At this point in ensureDistributionAndOrdering the required child distributions are already handled.
|
ensureDistributionAndOrdering matches the operator’s required sort ordering of children (requiredChildOrderings) to the children’s output partitioning and if the orderings do not match, SortExec unary physical operator is created as a new child.
In the end, ensureDistributionAndOrdering sets the new children for the input operator.
|
Note
|
ensureDistributionAndOrdering is used exclusively when EnsureRequirements is executed (i.e. applied to a physical plan).
|
Adding ExchangeCoordinator (Adaptive Query Execution) — withExchangeCoordinator Internal Method
withExchangeCoordinator(
children: Seq[SparkPlan],
requiredChildDistributions: Seq[Distribution]): Seq[SparkPlan]
withExchangeCoordinator adds ExchangeCoordinator to ShuffleExchangeExec operators if adaptive query execution is enabled (per spark.sql.adaptive.enabled property) and partitioning scheme of the ShuffleExchangeExec operators support ExchangeCoordinator.
|
Note
|
spark.sql.adaptive.enabled property is disabled by default. |
Internally, withExchangeCoordinator checks if the input children operators support ExchangeCoordinator which is that either holds:
-
If there is at least one ShuffleExchangeExec operator, all children are either
ShuffleExchangeExecwith HashPartitioning or their output partitioning is HashPartitioning (even inside PartitioningCollection) -
There are at least two
childrenoperators and the inputrequiredChildDistributionsare all ClusteredDistribution
With adaptive query execution (i.e. when spark.sql.adaptive.enabled configuration property is true) and the operator supports ExchangeCoordinator, withExchangeCoordinator creates a ExchangeCoordinator and:
-
For every
ShuffleExchangeExec, registers theExchangeCoordinator -
Creates HashPartitioning partitioning scheme with the default number of partitions to use when shuffling data for joins or aggregations (as spark.sql.shuffle.partitions which is
200by default) and addsShuffleExchangeExecto the final result (for the current physical operator)
Otherwise (when adaptive query execution is disabled or children do not support ExchangeCoordinator), withExchangeCoordinator returns the input children unchanged.
|
Note
|
withExchangeCoordinator is used exclusively for enforcing partition requirements of a physical operator.
|