// FIXME: Use Catalyst DSL to create a logical plan
import org.apache.spark.sql.catalyst.dsl.plans._
val t1 = table("t1")
import org.apache.spark.sql.catalyst.dsl.expressions._
val logicalPlan = t1
.select("a".attr, star())
.groupBy(groupingExprs = "b".attr)(aggregateExprs = star())
scala> println(logicalPlan.numberedTreeString)
00 'Aggregate ['b], [*]
01 +- 'Project ['a, *]
02 +- 'UnresolvedRelation `t1`
// END FIXME
// Make the example repeatable
val table = "t1"
import org.apache.spark.sql.catalyst.TableIdentifier
spark.sessionState.catalog.dropTable(TableIdentifier(table), ignoreIfNotExists = true, purge = true)
Seq((0, "zero"), (1, "one")).toDF("id", "name").createOrReplaceTempView(table)
val query = spark.table("t1").select("id", "*").groupBy("id", "name").agg(col("*"))
val logicalPlan = query.queryExecution.logical
scala> println(logicalPlan.numberedTreeString)
00 'Aggregate [id#28, name#29], [id#28, name#29, *]
01 +- Project [id#28, id#28, name#29]
02 +- SubqueryAlias t1
03 +- Project [_1#25 AS id#28, _2#26 AS name#29]
04 +- LocalRelation [_1#25, _2#26]
import spark.sessionState.analyzer.ResolveReferences
val planWithRefsResolved = ResolveReferences(logicalPlan)
scala> println(planWithRefsResolved.numberedTreeString)
00 Aggregate [id#28, name#29], [id#28, name#29, id#28, id#28, name#29]
01 +- Project [id#28, id#28, name#29]
02 +- SubqueryAlias t1
03 +- Project [_1#25 AS id#28, _2#26 AS name#29]
04 +- LocalRelation [_1#25, _2#26]
ResolveReferences Logical Resolution Rule
ResolveReferences
is a logical resolution rule that the logical query plan analyzer uses to resolve FIXME in a logical query plan, i.e.
-
Resolves…FIXME
Technically, ResolveReferences
is just a Catalyst rule for transforming logical plans, i.e. Rule[LogicalPlan]
.
ResolveReferences
is part of Resolution fixed-point batch of rules.
Resolving Expressions of Logical Plan — resolve
Internal Method
resolve(e: Expression, q: LogicalPlan): Expression
resolve
resolves the input expression per type:
-
UnresolvedAttribute
expressions -
UnresolvedExtractValue
expressions -
All other expressions
Note
|
resolve is used exclusively when ResolveReferences is requested to resolve reference expressions in a logical query plan.
|
Resolving Reference Expressions In Logical Query Plan (Applying ResolveReferences to Logical Plan) — apply
Method
apply(plan: LogicalPlan): LogicalPlan
Note
|
apply is part of Rule Contract to apply a rule to a logical plan.
|
apply
resolves the following logical operators:
-
Project logical operator with a
Star
expression to…FIXME -
Aggregate logical operator with a
Star
expression to…FIXME -
ScriptTransformation
logical operator with aStar
expression to…FIXME -
Generate logical operator with a
Star
expression to…FIXME -
Join logical operator with
duplicateResolved
…FIXME -
Intersect logical operator with
duplicateResolved
…FIXME -
Except logical operator with
duplicateResolved
…FIXME -
Sort logical operator unresolved with child operators resolved…FIXME
-
Generate logical operator resolved…FIXME
-
Generate logical operator unresolved…FIXME
In the end, apply
resolves the expressions of the input logical operator.
apply
skips logical operators that:
-
Use
UnresolvedDeserializer
expressions -
Have child operators unresolved
Expanding Star Expressions — buildExpandedProjectList
Internal Method
buildExpandedProjectList(
exprs: Seq[NamedExpression],
child: LogicalPlan): Seq[NamedExpression]
buildExpandedProjectList
expands (converts) Star expressions in the input named expressions recursively (down the expression tree) per expression:
-
For a
Star
expression,buildExpandedProjectList
requests it to expand given the inputchild
logical plan -
For a
UnresolvedAlias
expression with aStar
child expression,buildExpandedProjectList
requests it to expand given the inputchild
logical plan (similarly to aStar
expression alone in the above case) -
For
exprs
withStar
expressions down the expression tree,buildExpandedProjectList
expandStarExpression passing the inputexprs
andchild
Note
|
buildExpandedProjectList is used when ResolveReferences is requested to resolve reference expressions (in Project and Aggregate operators with Star expressions).
|
expandStarExpression
Method
expandStarExpression(expr: Expression, child: LogicalPlan): Expression
expandStarExpression
expands (transforms) the following expressions in the input expr
expression:
-
For UnresolvedFunction expressions with Star child expressions,
expandStarExpression
requests theStar
expressions to expand given the inputchild
logical plan and the resolver.// Using Catalyst DSL to create a logical plan with a function with Star child expression import org.apache.spark.sql.catalyst.dsl.plans._ val t1 = table("t1") import org.apache.spark.sql.catalyst.dsl.expressions._ val f1 = 'f1.function(star()) val plan = t1.select(f1) scala> println(plan.numberedTreeString) 00 'Project [unresolvedalias('f1(*), None)] 01 +- 'UnresolvedRelation `t1` // CAUTION: FIXME How to demo that the plan gets resolved using ResolveReferences.expandStarExpression?
-
For CreateNamedStruct expressions with Star child expressions among the values,
expandStarExpression
…FIXME -
For CreateArray expressions with Star child expressions,
expandStarExpression
…FIXME -
For Murmur3Hash expressions with Star child expressions,
expandStarExpression
…FIXME
-
For any other uses of Star expressions, expandStarExpression
fails analysis with a AnalysisException
:
Invalid usage of '*' in expression '[exprName]'
Note
|
expandStarExpression is used exclusively when ResolveReferences is requested to expand Star expressions (in Project and Aggregate operators).
|