// FIXME: Use Catalyst DSL to create a logical plan
import org.apache.spark.sql.catalyst.dsl.plans._
val t1 = table("t1")
import org.apache.spark.sql.catalyst.dsl.expressions._
val logicalPlan = t1
.select("a".attr, star())
.groupBy(groupingExprs = "b".attr)(aggregateExprs = star())
scala> println(logicalPlan.numberedTreeString)
00 'Aggregate ['b], [*]
01 +- 'Project ['a, *]
02 +- 'UnresolvedRelation `t1`
// END FIXME
// Make the example repeatable
val table = "t1"
import org.apache.spark.sql.catalyst.TableIdentifier
spark.sessionState.catalog.dropTable(TableIdentifier(table), ignoreIfNotExists = true, purge = true)
Seq((0, "zero"), (1, "one")).toDF("id", "name").createOrReplaceTempView(table)
val query = spark.table("t1").select("id", "*").groupBy("id", "name").agg(col("*"))
val logicalPlan = query.queryExecution.logical
scala> println(logicalPlan.numberedTreeString)
00 'Aggregate [id#28, name#29], [id#28, name#29, *]
01 +- Project [id#28, id#28, name#29]
02 +- SubqueryAlias t1
03 +- Project [_1#25 AS id#28, _2#26 AS name#29]
04 +- LocalRelation [_1#25, _2#26]
import spark.sessionState.analyzer.ResolveReferences
val planWithRefsResolved = ResolveReferences(logicalPlan)
scala> println(planWithRefsResolved.numberedTreeString)
00 Aggregate [id#28, name#29], [id#28, name#29, id#28, id#28, name#29]
01 +- Project [id#28, id#28, name#29]
02 +- SubqueryAlias t1
03 +- Project [_1#25 AS id#28, _2#26 AS name#29]
04 +- LocalRelation [_1#25, _2#26]
ResolveReferences Logical Resolution Rule
ResolveReferences is a logical resolution rule that the logical query plan analyzer uses to resolve FIXME in a logical query plan, i.e.
-
Resolves…FIXME
Technically, ResolveReferences is just a Catalyst rule for transforming logical plans, i.e. Rule[LogicalPlan].
ResolveReferences is part of Resolution fixed-point batch of rules.
Resolving Expressions of Logical Plan — resolve Internal Method
resolve(e: Expression, q: LogicalPlan): Expression
resolve resolves the input expression per type:
-
UnresolvedAttributeexpressions -
UnresolvedExtractValueexpressions -
All other expressions
|
Note
|
resolve is used exclusively when ResolveReferences is requested to resolve reference expressions in a logical query plan.
|
Resolving Reference Expressions In Logical Query Plan (Applying ResolveReferences to Logical Plan) — apply Method
apply(plan: LogicalPlan): LogicalPlan
|
Note
|
apply is part of Rule Contract to apply a rule to a logical plan.
|
apply resolves the following logical operators:
-
Project logical operator with a
Starexpression to…FIXME -
Aggregate logical operator with a
Starexpression to…FIXME -
ScriptTransformationlogical operator with aStarexpression to…FIXME -
Generate logical operator with a
Starexpression to…FIXME -
Join logical operator with
duplicateResolved…FIXME -
Intersect logical operator with
duplicateResolved…FIXME -
Except logical operator with
duplicateResolved…FIXME -
Sort logical operator unresolved with child operators resolved…FIXME
-
Generate logical operator resolved…FIXME
-
Generate logical operator unresolved…FIXME
In the end, apply resolves the expressions of the input logical operator.
apply skips logical operators that:
-
Use
UnresolvedDeserializerexpressions -
Have child operators unresolved
Expanding Star Expressions — buildExpandedProjectList Internal Method
buildExpandedProjectList(
exprs: Seq[NamedExpression],
child: LogicalPlan): Seq[NamedExpression]
buildExpandedProjectList expands (converts) Star expressions in the input named expressions recursively (down the expression tree) per expression:
-
For a
Starexpression,buildExpandedProjectListrequests it to expand given the inputchildlogical plan -
For a
UnresolvedAliasexpression with aStarchild expression,buildExpandedProjectListrequests it to expand given the inputchildlogical plan (similarly to aStarexpression alone in the above case) -
For
exprswithStarexpressions down the expression tree,buildExpandedProjectListexpandStarExpression passing the inputexprsandchild
|
Note
|
buildExpandedProjectList is used when ResolveReferences is requested to resolve reference expressions (in Project and Aggregate operators with Star expressions).
|
expandStarExpression Method
expandStarExpression(expr: Expression, child: LogicalPlan): Expression
expandStarExpression expands (transforms) the following expressions in the input expr expression:
-
For UnresolvedFunction expressions with Star child expressions,
expandStarExpressionrequests theStarexpressions to expand given the inputchildlogical plan and the resolver.// Using Catalyst DSL to create a logical plan with a function with Star child expression import org.apache.spark.sql.catalyst.dsl.plans._ val t1 = table("t1") import org.apache.spark.sql.catalyst.dsl.expressions._ val f1 = 'f1.function(star()) val plan = t1.select(f1) scala> println(plan.numberedTreeString) 00 'Project [unresolvedalias('f1(*), None)] 01 +- 'UnresolvedRelation `t1` // CAUTION: FIXME How to demo that the plan gets resolved using ResolveReferences.expandStarExpression?-
For CreateNamedStruct expressions with Star child expressions among the values,
expandStarExpression…FIXME -
For CreateArray expressions with Star child expressions,
expandStarExpression…FIXME -
For Murmur3Hash expressions with Star child expressions,
expandStarExpression…FIXME
-
For any other uses of Star expressions, expandStarExpression fails analysis with a AnalysisException:
Invalid usage of '*' in expression '[exprName]'
|
Note
|
expandStarExpression is used exclusively when ResolveReferences is requested to expand Star expressions (in Project and Aggregate operators).
|