PlanSubqueries Physical Query Optimization

PlanSubqueries is a physical query optimization (aka physical query preparation rule or simply preparation rule) that plans ScalarSubquery (SubqueryExpression) expressions (as ScalarSubquery ExecSubqueryExpression expressions).

import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...

import org.apache.spark.sql.execution.PlanSubqueries
val planSubqueries = PlanSubqueries(spark)

Seq(
  (0, 0),
  (1, 0),
  (2, 1)
).toDF("id", "gid").createOrReplaceTempView("t")

Seq(
  (0, 3),
  (1, 20)
).toDF("gid", "lvl").createOrReplaceTempView("v")

val sql = """
  select * from t where gid > (select max(gid) from v)
"""
val q = spark.sql(sql)

val sparkPlan = q.queryExecution.sparkPlan
scala> println(sparkPlan.numberedTreeString)
00 Project [_1#49 AS id#52, _2#50 AS gid#53]
01 +- Filter (_2#50 > scalar-subquery#128 [])
02    :  +- Aggregate [max(gid#61) AS max(gid)#130]
03    :     +- LocalRelation [gid#61]
04    +- LocalTableScan [_1#49, _2#50]

val optimizedPlan = planSubqueries(sparkPlan)
scala> println(optimizedPlan.numberedTreeString)
00 Project [_1#49 AS id#52, _2#50 AS gid#53]
01 +- Filter (_2#50 > Subquery subquery128)
02    :  +- Subquery subquery128
03    :     +- *(2) HashAggregate(keys=[], functions=[max(gid#61)], output=[max(gid)#130])
04    :        +- Exchange SinglePartition
05    :           +- *(1) HashAggregate(keys=[], functions=[partial_max(gid#61)], output=[max#134])
06    :              +- LocalTableScan [gid#61]
07    +- LocalTableScan [_1#49, _2#50]

PlanSubqueries is part of preparations batch of physical query plan rules and is executed when QueryExecution is requested for the optimized physical query plan (i.e. in executedPlan phase of a query execution).

Technically, PlanSubqueries is just a Catalyst rule for transforming physical query plans, i.e. Rule[SparkPlan].

Applying PlanSubqueries Rule to Physical Plan (Executing PlanSubqueries) — apply Method

apply(plan: SparkPlan): SparkPlan
Note
apply is part of Rule Contract to apply a rule to a TreeNode, e.g. physical plan.

For every ScalarSubquery (SubqueryExpression) expression in the input physical plan, apply does the following:

  1. Builds the optimized physical plan (aka executedPlan) of the subquery logical plan, i.e. creates a QueryExecution for the subquery logical plan and requests the optimized physical plan.

  2. Plans the scalar subquery, i.e. creates a ScalarSubquery (ExecSubqueryExpression) expression with a new SubqueryExec physical operator (with the name subquery[id] and the optimized physical plan) and the ExprId.

results matching ""

    No results matching ""