CostBasedJoinReorder Logical Optimization — Join Reordering in Cost-Based Optimization

ReorderJoin is part of the Join Reorder once-executed batch in the standard batches of the Catalyst Optimizer.

ReorderJoin is simply a Catalyst rule for transforming logical plans, i.e. Rule[LogicalPlan].

CostBasedJoinReorder applies the join optimizations on a logical plan with 2 or more consecutive inner or cross joins (possibly separated by Project operators) when spark.sql.cbo.enabled and spark.sql.cbo.joinReorder.enabled configuration properties are both enabled.

// Use shortcuts to read the values of the properties
scala> spark.sessionState.conf.cboEnabled
res0: Boolean = true

scala> spark.sessionState.conf.joinReorderEnabled
res1: Boolean = true

CostBasedJoinReorder uses row count statistic that is computed using ANALYZE TABLE COMPUTE STATISTICS SQL command with no NOSCAN option.

// Create tables and compute their row count statistics
// There have to be at least 2 joins
// Make the example reproducible
val tableNames = Seq("t1", "t2", "tiny")
import org.apache.spark.sql.catalyst.TableIdentifier
val tableIds = tableNames.map(TableIdentifier.apply)
val sessionCatalog = spark.sessionState.catalog
tableIds.foreach { tableId =>
  sessionCatalog.dropTable(tableId, ignoreIfNotExists = true, purge = true)
}

val belowBroadcastJoinThreshold = spark.sessionState.conf.autoBroadcastJoinThreshold - 1
spark.range(belowBroadcastJoinThreshold).write.saveAsTable("t1")
// t2 is twice as big as t1
spark.range(2 * belowBroadcastJoinThreshold).write.saveAsTable("t2")
spark.range(5).write.saveAsTable("tiny")

// Compute row count statistics
tableNames.foreach { t =>
  sql(s"ANALYZE TABLE $t COMPUTE STATISTICS")
}

// Load the tables
val t1 = spark.table("t1")
val t2 = spark.table("t2")
val tiny = spark.table("tiny")

// Example: Inner join with join condition
val q = t1.join(t2, Seq("id")).join(tiny, Seq("id"))
val plan = q.queryExecution.analyzed
scala> println(plan.numberedTreeString)
00 Project [id#51L]
01 +- Join Inner, (id#51L = id#57L)
02    :- Project [id#51L]
03    :  +- Join Inner, (id#51L = id#54L)
04    :     :- SubqueryAlias t1
05    :     :  +- Relation[id#51L] parquet
06    :     +- SubqueryAlias t2
07    :        +- Relation[id#54L] parquet
08    +- SubqueryAlias tiny
09       +- Relation[id#57L] parquet

// Eliminate SubqueryAlias logical operators as they no longer needed
// And "confuse" CostBasedJoinReorder
// CostBasedJoinReorder cares about how deep Joins are and reorders consecutive joins only
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
val noAliasesPlan = EliminateSubqueryAliases(plan)
scala> println(noAliasesPlan.numberedTreeString)
00 Project [id#51L]
01 +- Join Inner, (id#51L = id#57L)
02    :- Project [id#51L]
03    :  +- Join Inner, (id#51L = id#54L)
04    :     :- Relation[id#51L] parquet
05    :     +- Relation[id#54L] parquet
06    +- Relation[id#57L] parquet

// Let's go pro and create a custom RuleExecutor (i.e. an Optimizer)
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.optimizer.CostBasedJoinReorder
object Optimize extends RuleExecutor[LogicalPlan] {
  val batches =
    Batch("EliminateSubqueryAliases", Once, EliminateSubqueryAliases) ::
    Batch("Join Reorder", Once, CostBasedJoinReorder) :: Nil
}

val joinsReordered = Optimize.execute(plan)
scala> println(joinsReordered.numberedTreeString)
00 Project [id#51L]
01 +- Join Inner, (id#51L = id#54L)
02    :- Project [id#51L]
03    :  +- Join Inner, (id#51L = id#57L)
04    :     :- Relation[id#51L] parquet
05    :     +- Relation[id#57L] parquet
06    +- Relation[id#54L] parquet

// Execute the plans
// Compare the plans as diagrams in web UI @ http://localhost:4040/SQL
// We'd have to use too many internals so let's turn CBO on and off
// Moreover, please remember that the query "phases" are cached
// That's why we copy and paste the entire query for execution
import org.apache.spark.sql.internal.SQLConf
val cc = SQLConf.get
cc.setConf(SQLConf.CBO_ENABLED, false)
val q = t1.join(t2, Seq("id")).join(tiny, Seq("id"))
q.collect.foreach(_ => ())

cc.setConf(SQLConf.CBO_ENABLED, true)
val q = t1.join(t2, Seq("id")).join(tiny, Seq("id"))
q.collect.foreach(_ => ())
Caution

FIXME Examples of other join queries

  • Cross join with join condition

  • Project with attributes only and Inner join with join condition

  • Project with attributes only and Cross join with join condition

Tip

Enable DEBUG logging level for org.apache.spark.sql.catalyst.optimizer.JoinReorderDP logger to see the join reordering duration.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.catalyst.optimizer.JoinReorderDP=DEBUG

Refer to Logging.

Executing Rule — apply Method

apply(plan: LogicalPlan): LogicalPlan
Note
apply is part of the Rule Contract to execute (apply) a rule on a TreeNode (e.g. LogicalPlan).

apply traverses the input logical plan down and tries to reorder the following logical operators:

  • Join for CROSS or INNER joins with a join condition

  • Project with the above Join child operator and the project list of Attribute leaf expressions only

Reordering Logical Plan with Join Operators — reorder Internal Method

reorder(plan: LogicalPlan, output: Seq[Attribute]): LogicalPlan

reorder…​FIXME

Note
reorder is used exclusively when CostBasedJoinReorder is applied to a logical plan.

replaceWithOrderedJoin Internal Method

replaceWithOrderedJoin(plan: LogicalPlan): LogicalPlan

replaceWithOrderedJoin…​FIXME

Note
replaceWithOrderedJoin is used recursively and when CostBasedJoinReorder is reordering…​FIXME

Extracting Consecutive Join Operators — extractInnerJoins Internal Method

extractInnerJoins(plan: LogicalPlan): (Seq[LogicalPlan], Set[Expression])

extractInnerJoins finds consecutive Join logical operators (inner or cross) with join conditions or Project logical operators with Join logical operator and the project list of Attribute leaf expressions only.

For Project operators extractInnerJoins calls itself recursively with the Join operator inside.

In the end, extractInnerJoins gives the collection of logical plans under the consecutive Join logical operators (possibly separated by Project operators only) and their join conditions (for which And expressions have been split).

Note
extractInnerJoins is used recursively when CostBasedJoinReorder is reordering a logical plan.

results matching ""

    No results matching ""