import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
import org.apache.spark.sql.execution.PlanSubqueries
val planSubqueries = PlanSubqueries(spark)
Seq(
(0, 0),
(1, 0),
(2, 1)
).toDF("id", "gid").createOrReplaceTempView("t")
Seq(
(0, 3),
(1, 20)
).toDF("gid", "lvl").createOrReplaceTempView("v")
val sql = """
select * from t where gid > (select max(gid) from v)
"""
val q = spark.sql(sql)
val sparkPlan = q.queryExecution.sparkPlan
scala> println(sparkPlan.numberedTreeString)
00 Project [_1#49 AS id#52, _2#50 AS gid#53]
01 +- Filter (_2#50 > scalar-subquery#128 [])
02 : +- Aggregate [max(gid#61) AS max(gid)#130]
03 : +- LocalRelation [gid#61]
04 +- LocalTableScan [_1#49, _2#50]
val optimizedPlan = planSubqueries(sparkPlan)
scala> println(optimizedPlan.numberedTreeString)
00 Project [_1#49 AS id#52, _2#50 AS gid#53]
01 +- Filter (_2#50 > Subquery subquery128)
02 : +- Subquery subquery128
03 : +- *(2) HashAggregate(keys=[], functions=[max(gid#61)], output=[max(gid)#130])
04 : +- Exchange SinglePartition
05 : +- *(1) HashAggregate(keys=[], functions=[partial_max(gid#61)], output=[max#134])
06 : +- LocalTableScan [gid#61]
07 +- LocalTableScan [_1#49, _2#50]
PlanSubqueries Physical Query Optimization
PlanSubqueries
is a physical query optimization (aka physical query preparation rule or simply preparation rule) that plans ScalarSubquery (SubqueryExpression) expressions (as ScalarSubquery ExecSubqueryExpression
expressions).
PlanSubqueries
is part of preparations batch of physical query plan rules and is executed when QueryExecution
is requested for the optimized physical query plan (i.e. in executedPlan phase of a query execution).
Technically, PlanSubqueries
is just a Catalyst rule for transforming physical query plans, i.e. Rule[SparkPlan]
.
Applying PlanSubqueries Rule to Physical Plan (Executing PlanSubqueries) — apply
Method
apply(plan: SparkPlan): SparkPlan
Note
|
apply is part of Rule Contract to apply a rule to a TreeNode, e.g. physical plan.
|
For every ScalarSubquery (SubqueryExpression) expression in the input physical plan, apply
does the following:
-
Builds the optimized physical plan (aka
executedPlan
) of the subquery logical plan, i.e. creates a QueryExecution for the subquery logical plan and requests the optimized physical plan. -
Plans the scalar subquery, i.e. creates a ScalarSubquery (ExecSubqueryExpression) expression with a new SubqueryExec physical operator (with the name subquery[id] and the optimized physical plan) and the ExprId.