ResolveBroadcastHints Logical Resolution Rule — Resolving UnresolvedHint Operators with BROADCAST, BROADCASTJOIN and MAPJOIN Hint Names

ResolveBroadcastHints is a logical resolution rule that the Spark Analyzer uses to resolve UnresolvedHint logical operators with BROADCAST, BROADCASTJOIN or MAPJOIN hints (case-insensitive) to ResolvedHint operators.

Technically, ResolveBroadcastHints is a Catalyst rule for transforming logical plans, i.e. Rule[LogicalPlan].

ResolveBroadcastHints is part of Hints fixed-point batch of rules (that is executed before any other rule).

ResolveBroadcastHints takes a SQLConf when created.

// Use Catalyst DSL to create a logical plan
import org.apache.spark.sql.catalyst.dsl.plans._
val plan = table("t1").join(table("t2")).hint(name = "broadcast", "t1", "table2")
scala> println(plan.numberedTreeString)
00 'UnresolvedHint broadcast, [t1, table2]
01 +- 'Join Inner
02    :- 'UnresolvedRelation `t1`
03    +- 'UnresolvedRelation `t2`

import org.apache.spark.sql.catalyst.analysis.ResolveHints.ResolveBroadcastHints
val resolver = new ResolveBroadcastHints(spark.sessionState.conf)
val analyzedPlan = resolver(plan)
scala> println(analyzedPlan.numberedTreeString)
00 'Join Inner
01 :- 'ResolvedHint (broadcast)
02 :  +- 'UnresolvedRelation `t1`
03 +- 'UnresolvedRelation `t2`

Resolving UnresolvedHint with BROADCAST, BROADCASTJOIN or MAPJOIN Hint Names (Applying ResolveBroadcastHints to Logical Plan) — apply Method

apply(plan: LogicalPlan): LogicalPlan
Note
apply is part of Rule Contract to apply a rule to a logical plan.

apply transforms UnresolvedHint operators into ResolvedHint for the hint names as BROADCAST, BROADCASTJOIN or MAPJOIN (case-insensitive).

For UnresolvedHints with no parameters, apply marks the entire child logical plan as eligible for broadcast, i.e. creates a ResolvedHint with the child operator and HintInfo with broadcast flag on.

For UnresolvedHints with parameters defined, apply considers the parameters the names of the tables to apply broadcast hint to.

Note
The table names can be of String or UnresolvedAttribute types.

apply reports an AnalysisException for the parameters that are not of String or UnresolvedAttribute types.

org.apache.spark.sql.AnalysisException: Broadcast hint parameter should be an identifier or string but was [unsupported] ([className]
// Use Catalyst DSL to create a logical plan
import org.apache.spark.sql.catalyst.dsl.plans._
// !!! IT WON'T WORK !!!
// 1 is not a table name or of type `UnresolvedAttribute`
val plan = table("t1").hint(name = "broadcast", 1)

scala> println(plan.numberedTreeString)
00 'UnresolvedHint broadcast, [1]
01 +- 'UnresolvedRelation `t1`

// Resolve hints
import org.apache.spark.sql.catalyst.analysis.ResolveHints
val broadcastHintResolver = new ResolveHints.ResolveBroadcastHints(spark.sessionState.conf)
scala> broadcastHintResolver(plan)
org.apache.spark.sql.AnalysisException: Broadcast hint parameter should be an identifier or string but was 1 (class java.lang.Integer;
  at org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveBroadcastHints$$anonfun$apply$1$$anonfun$applyOrElse$1.apply(ResolveHints.scala:98)
  at org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveBroadcastHints$$anonfun$apply$1$$anonfun$applyOrElse$1.apply(ResolveHints.scala:95)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
  at org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveBroadcastHints$$anonfun$apply$1.applyOrElse(ResolveHints.scala:95)
  at org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveBroadcastHints$$anonfun$apply$1.applyOrElse(ResolveHints.scala:88)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
  at org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveBroadcastHints.apply(ResolveHints.scala:88)
  ... 51 elided

applyBroadcastHint Internal Method

applyBroadcastHint(plan: LogicalPlan, toBroadcast: Set[String]): LogicalPlan

applyBroadcastHint…​FIXME

Note
applyBroadcastHint is used exclusively when ResolveBroadcastHints is requested to execute.

results matching ""

    No results matching ""