// Use Catalyst DSL to create a logical plan
import org.apache.spark.sql.catalyst.dsl.plans._
val plan = table("t1").join(table("t2")).hint(name = "broadcast", "t1", "table2")
scala> println(plan.numberedTreeString)
00 'UnresolvedHint broadcast, [t1, table2]
01 +- 'Join Inner
02 :- 'UnresolvedRelation `t1`
03 +- 'UnresolvedRelation `t2`
import org.apache.spark.sql.catalyst.analysis.ResolveHints.ResolveBroadcastHints
val resolver = new ResolveBroadcastHints(spark.sessionState.conf)
val analyzedPlan = resolver(plan)
scala> println(analyzedPlan.numberedTreeString)
00 'Join Inner
01 :- 'ResolvedHint (broadcast)
02 : +- 'UnresolvedRelation `t1`
03 +- 'UnresolvedRelation `t2`
ResolveBroadcastHints Logical Resolution Rule — Resolving UnresolvedHint Operators with BROADCAST, BROADCASTJOIN and MAPJOIN Hint Names
ResolveBroadcastHints
is a logical resolution rule that the Spark Analyzer uses to resolve UnresolvedHint logical operators with BROADCAST
, BROADCASTJOIN
or MAPJOIN
hints (case-insensitive) to ResolvedHint operators.
Technically, ResolveBroadcastHints
is a Catalyst rule for transforming logical plans, i.e. Rule[LogicalPlan]
.
ResolveBroadcastHints
is part of Hints fixed-point batch of rules (that is executed before any other rule).
ResolveBroadcastHints
takes a SQLConf when created.
Resolving UnresolvedHint with BROADCAST, BROADCASTJOIN or MAPJOIN Hint Names (Applying ResolveBroadcastHints to Logical Plan) — apply
Method
apply(plan: LogicalPlan): LogicalPlan
Note
|
apply is part of Rule Contract to apply a rule to a logical plan.
|
apply
transforms UnresolvedHint operators into ResolvedHint for the hint names as BROADCAST
, BROADCASTJOIN
or MAPJOIN
(case-insensitive).
For UnresolvedHints
with no parameters, apply
marks the entire child logical plan as eligible for broadcast, i.e. creates a ResolvedHint
with the child operator and HintInfo
with broadcast flag on.
For UnresolvedHints
with parameters defined, apply
considers the parameters the names of the tables to apply broadcast hint to.
Note
|
The table names can be of String or UnresolvedAttribute types.
|
apply
reports an AnalysisException
for the parameters that are not of String
or UnresolvedAttribute
types.
org.apache.spark.sql.AnalysisException: Broadcast hint parameter should be an identifier or string but was [unsupported] ([className]
// Use Catalyst DSL to create a logical plan
import org.apache.spark.sql.catalyst.dsl.plans._
// !!! IT WON'T WORK !!!
// 1 is not a table name or of type `UnresolvedAttribute`
val plan = table("t1").hint(name = "broadcast", 1)
scala> println(plan.numberedTreeString)
00 'UnresolvedHint broadcast, [1]
01 +- 'UnresolvedRelation `t1`
// Resolve hints
import org.apache.spark.sql.catalyst.analysis.ResolveHints
val broadcastHintResolver = new ResolveHints.ResolveBroadcastHints(spark.sessionState.conf)
scala> broadcastHintResolver(plan)
org.apache.spark.sql.AnalysisException: Broadcast hint parameter should be an identifier or string but was 1 (class java.lang.Integer;
at org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveBroadcastHints$$anonfun$apply$1$$anonfun$applyOrElse$1.apply(ResolveHints.scala:98)
at org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveBroadcastHints$$anonfun$apply$1$$anonfun$applyOrElse$1.apply(ResolveHints.scala:95)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveBroadcastHints$$anonfun$apply$1.applyOrElse(ResolveHints.scala:95)
at org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveBroadcastHints$$anonfun$apply$1.applyOrElse(ResolveHints.scala:88)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveBroadcastHints.apply(ResolveHints.scala:88)
... 51 elided
applyBroadcastHint
Internal Method
applyBroadcastHint(plan: LogicalPlan, toBroadcast: Set[String]): LogicalPlan
applyBroadcastHint
…FIXME
Note
|
applyBroadcastHint is used exclusively when ResolveBroadcastHints is requested to execute.
|