CollapseCodegenStages Physical Query Optimization — Collapsing Physical Operators for Whole-Stage Java Code Generation (aka Whole-Stage CodeGen)

CollapseCodegenStages is a physical query optimization (aka physical query preparation rule or simply preparation rule) that collapses physical operators and generates a Java source code for their execution.

When executed (with whole-stage code generation enabled), CollapseCodegenStages inserts WholeStageCodegenExec or InputAdapter physical operators to a physical plan. CollapseCodegenStages uses so-called control gates before deciding whether a physical operator supports the whole-stage Java code generation or not (and what physical operator to insert):

  1. Factors in physical operators with CodegenSupport only

  2. Enforces the supportCodegen custom requirements on a physical operator, i.e.

    1. supportCodegen flag turned on (true)

    2. No Catalyst expressions are CodegenFallback

    3. Output schema is neither wide nor deep and uses just enough fields (including nested fields)

    4. Children use output schema that is also neither wide nor deep

Note

spark.sql.codegen.maxFields Spark internal property controls the total number of fields in a schema that is acceptable for whole-stage code generation.

The number is 100 by default.

Technically, CollapseCodegenStages is just a Catalyst rule for transforming physical query plans, i.e. Rule[SparkPlan].

CollapseCodegenStages is part of preparations batch of physical query plan rules and is executed when QueryExecution is requested for the optimized physical query plan (i.e. in executedPlan phase of a query execution).

val q = spark.range(3).groupBy('id % 2 as "gid").count

// Let's see where and how many "stars" does this query get
scala> q.explain
== Physical Plan ==
*(2) HashAggregate(keys=[(id#0L % 2)#9L], functions=[count(1)])
+- Exchange hashpartitioning((id#0L % 2)#9L, 200)
   +- *(1) HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#9L], functions=[partial_count(1)])
      +- *(1) Range (0, 3, step=1, splits=8)

// There are two stage IDs: 1 and 2 (see the round brackets)
// Looks like Exchange physical operator does not support codegen
// Let's walk through the query execution phases and see it ourselves

// sparkPlan phase is just before CollapseCodegenStages physical optimization is applied
val sparkPlan = q.queryExecution.sparkPlan
scala> println(sparkPlan.numberedTreeString)
00 HashAggregate(keys=[(id#0L % 2)#12L], functions=[count(1)], output=[gid#2L, count#5L])
01 +- HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#12L], functions=[partial_count(1)], output=[(id#0L % 2)#12L, count#11L])
02    +- Range (0, 3, step=1, splits=8)

// Compare the above with the executedPlan phase
// which happens to be after CollapseCodegenStages physical optimization
scala> println(q.queryExecution.executedPlan.numberedTreeString)
00 *(2) HashAggregate(keys=[(id#0L % 2)#12L], functions=[count(1)], output=[gid#2L, count#5L])
01 +- Exchange hashpartitioning((id#0L % 2)#12L, 200)
02    +- *(1) HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#12L], functions=[partial_count(1)], output=[(id#0L % 2)#12L, count#11L])
03       +- *(1) Range (0, 3, step=1, splits=8)

// Let's apply the CollapseCodegenStages rule ourselves
import org.apache.spark.sql.execution.CollapseCodegenStages
val ccsRule = CollapseCodegenStages(spark.sessionState.conf)
scala> val planAfterCCS = ccsRule.apply(sparkPlan)
planAfterCCS: org.apache.spark.sql.execution.SparkPlan =
*(1) HashAggregate(keys=[(id#0L % 2)#12L], functions=[count(1)], output=[gid#2L, count#5L])
+- *(1) HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#12L], functions=[partial_count(1)], output=[(id#0L % 2)#12L, count#11L])
   +- *(1) Range (0, 3, step=1, splits=8)

// The number of stage IDs do not match
// Looks like the above misses one or more rules
// EnsureRequirements optimization rule?
// It is indeed executed before CollapseCodegenStages
import org.apache.spark.sql.execution.exchange.EnsureRequirements
val erRule = EnsureRequirements(spark.sessionState.conf)
val planAfterER = erRule.apply(sparkPlan)
scala> println(planAfterER.numberedTreeString)
00 HashAggregate(keys=[(id#0L % 2)#12L], functions=[count(1)], output=[gid#2L, count#5L])
01 +- Exchange hashpartitioning((id#0L % 2)#12L, 200)
02    +- HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#12L], functions=[partial_count(1)], output=[(id#0L % 2)#12L, count#11L])
03       +- Range (0, 3, step=1, splits=8)

// Time for CollapseCodegenStages
val planAfterCCS = ccsRule.apply(planAfterER)
scala> println(planAfterCCS.numberedTreeString)
00 *(2) HashAggregate(keys=[(id#0L % 2)#12L], functions=[count(1)], output=[gid#2L, count#5L])
01 +- Exchange hashpartitioning((id#0L % 2)#12L, 200)
02    +- *(1) HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#12L], functions=[partial_count(1)], output=[(id#0L % 2)#12L, count#11L])
03       +- *(1) Range (0, 3, step=1, splits=8)

assert(planAfterCCS == q.queryExecution.executedPlan, "Plan after ER and CCS rules should match the executedPlan plan")

// Bingo!
// The result plan matches the executedPlan plan

// HashAggregateExec and Range physical operators support codegen (is a CodegenSupport)
// - HashAggregateExec disables codegen for ImperativeAggregate aggregate functions
// ShuffleExchangeExec does not support codegen (is not a CodegenSupport)

// The top-level physical operator should be WholeStageCodegenExec
import org.apache.spark.sql.execution.WholeStageCodegenExec
val wsce = planAfterCCS.asInstanceOf[WholeStageCodegenExec]

// The single child operator should be HashAggregateExec
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
val hae = wsce.child.asInstanceOf[HashAggregateExec]

// Since ShuffleExchangeExec does not support codegen, the child of HashAggregateExec is InputAdapter
import org.apache.spark.sql.execution.InputAdapter
val ia = hae.child.asInstanceOf[InputAdapter]

// And it's only now when we can get at ShuffleExchangeExec
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
val se = ia.child.asInstanceOf[ShuffleExchangeExec]

With spark.sql.codegen.wholeStage Spark internal property enabled (which is on by default), CollapseCodegenStages finds physical operators with CodegenSupport for which whole-stage codegen requirements hold and collapses them together as WholeStageCodegenExec physical operator (possibly with InputAdapter in-between for physical operators with no support for Java code generation).

Note
val q = spark.range(1).groupBy("id").count
scala> q.explain
== Physical Plan ==
*HashAggregate(keys=[id#16L], functions=[count(1)])
+- Exchange hashpartitioning(id#16L, 200)
   +- *HashAggregate(keys=[id#16L], functions=[partial_count(1)])
      +- *Range (0, 1, step=1, splits=8)

CollapseCodegenStages takes a SQLConf when created.

Note

You can disable CollapseCodegenStages (and so whole-stage Java code generation) by turning spark.sql.codegen.wholeStage Spark internal property off.

spark.sql.codegen.wholeStage property is enabled by default.

import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED
scala> spark.conf.get(WHOLESTAGE_CODEGEN_ENABLED)
res0: String = true

Use SQLConf.wholeStageEnabled method to access the current value.

scala> spark.sessionState.conf.wholeStageEnabled
res1: Boolean = true
Tip
Import CollapseCodegenStages and apply the rule directly to a physical plan to learn how the rule works.
import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
// Just a structured query with explode Generator expression that supports codegen "partially"
// i.e. explode extends CodegenSupport but codegenSupport flag is off
val q = spark.range(2)
  .filter($"id" === 0)
  .select(explode(lit(Array(0,1,2))) as "exploded")
  .join(spark.range(2))
  .where($"exploded" === $"id")
scala> q.show
+--------+---+
|exploded| id|
+--------+---+
|       0|  0|
|       1|  1|
+--------+---+

// the final physical plan (after CollapseCodegenStages applied and the other optimization rules)
scala> q.explain
== Physical Plan ==
*BroadcastHashJoin [cast(exploded#34 as bigint)], [id#37L], Inner, BuildRight
:- *Filter isnotnull(exploded#34)
:  +- Generate explode([0,1,2]), false, false, [exploded#34]
:     +- *Project
:        +- *Filter (id#29L = 0)
:           +- *Range (0, 2, step=1, splits=8)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
   +- *Range (0, 2, step=1, splits=8)

// Control when CollapseCodegenStages is applied to a query plan
// Take sparkPlan that is a physical plan before optimizations, incl. CollapseCodegenStages
val plan = q.queryExecution.sparkPlan

// Is wholeStageEnabled enabled?
// It is by default
scala> println(spark.sessionState.conf.wholeStageEnabled)
true

import org.apache.spark.sql.execution.CollapseCodegenStages
val ccs = CollapseCodegenStages(conf = spark.sessionState.conf)

scala> ccs.ruleName
res0: String = org.apache.spark.sql.execution.CollapseCodegenStages

// Before CollapseCodegenStages
scala> println(plan.numberedTreeString)
00 BroadcastHashJoin [cast(exploded#34 as bigint)], [id#37L], Inner, BuildRight
01 :- Filter isnotnull(exploded#34)
02 :  +- Generate explode([0,1,2]), false, false, [exploded#34]
03 :     +- Project
04 :        +- Filter (id#29L = 0)
05 :           +- Range (0, 2, step=1, splits=8)
06 +- Range (0, 2, step=1, splits=8)

// After CollapseCodegenStages
// Note the stars (that WholeStageCodegenExec.generateTreeString gives)
val execPlan = ccs.apply(plan)
scala> println(execPlan.numberedTreeString)
00 *BroadcastHashJoin [cast(exploded#34 as bigint)], [id#37L], Inner, BuildRight
01 :- *Filter isnotnull(exploded#34)
02 :  +- Generate explode([0,1,2]), false, false, [exploded#34]
03 :     +- *Project
04 :        +- *Filter (id#29L = 0)
05 :           +- *Range (0, 2, step=1, splits=8)
06 +- *Range (0, 2, step=1, splits=8)

// The first star is from WholeStageCodegenExec physical operator
import org.apache.spark.sql.execution.WholeStageCodegenExec
val wsc = execPlan(0).asInstanceOf[WholeStageCodegenExec]
scala> println(wsc.numberedTreeString)
00 *BroadcastHashJoin [cast(exploded#34 as bigint)], [id#37L], Inner, BuildRight
01 :- *Filter isnotnull(exploded#34)
02 :  +- Generate explode([0,1,2]), false, false, [exploded#34]
03 :     +- *Project
04 :        +- *Filter (id#29L = 0)
05 :           +- *Range (0, 2, step=1, splits=8)
06 +- *Range (0, 2, step=1, splits=8)

// Let's disable wholeStage codegen
// CollapseCodegenStages becomes a noop
// It is as if we were not applied Spark optimizations to a physical plan
// We're selective as we only disable whole-stage codegen
val newSpark = spark.newSession()
import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED
newSpark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false)
scala> println(newSpark.sessionState.conf.wholeStageEnabled)
false

// Whole-stage codegen is disabled
// So regardless whether you do apply Spark optimizations or not
// Java code generation won't take place
val ccsWholeStageDisabled = CollapseCodegenStages(conf = newSpark.sessionState.conf)
val execPlan = ccsWholeStageDisabled.apply(plan)
// Note no stars in the output
scala> println(execPlan.numberedTreeString)
00 BroadcastHashJoin [cast(exploded#34 as bigint)], [id#37L], Inner, BuildRight
01 :- Filter isnotnull(exploded#34)
02 :  +- Generate explode([0,1,2]), false, false, [exploded#34]
03 :     +- Project
04 :        +- Filter (id#29L = 0)
05 :           +- Range (0, 2, step=1, splits=8)
06 +- Range (0, 2, step=1, splits=8)

Executing Rule (Inserting WholeStageCodegenExec or InputAdapter into Physical Query Plan for Whole-Stage Java Code Generation) — apply Method

apply(plan: SparkPlan): SparkPlan
Note
apply is part of the Rule Contract to apply a rule to a TreeNode (e.g. physical plan).

apply starts inserting WholeStageCodegenExec (with InputAdapter) in the input plan physical plan only when spark.sql.codegen.wholeStage Spark internal property is turned on.

Otherwise, apply does nothing at all (i.e. passes the input physical plan through unchanged).

Inserting WholeStageCodegenExec Physical Operator For Codegen Stages — insertWholeStageCodegen Internal Method

insertWholeStageCodegen(plan: SparkPlan): SparkPlan

Internally, insertWholeStageCodegen branches off per physical operator:

  1. For physical operators with a single output schema attribute of type ObjectType, insertWholeStageCodegen requests the operator for the child physical operators and tries to insertWholeStageCodegen on them only.

  1. For physical operators that support Java code generation and meets the additional requirements for codegen, insertWholeStageCodegen insertInputAdapter (with the operator), requests WholeStageCodegenId for the getNextStageId and then uses both to return a new WholeStageCodegenExec physical operator.

  2. For any other physical operators, insertWholeStageCodegen requests the operator for the child physical operators and tries to insertWholeStageCodegen on them only.

// FIXME: DEMO
// Step 1. The top-level physical operator is CodegenSupport with supportCodegen enabled
// Step 2. The top-level operator is CodegenSupport with supportCodegen disabled
// Step 3. The top-level operator is not CodegenSupport
// Step 4. "plan.output.length == 1 && plan.output.head.dataType.isInstanceOf[ObjectType]"
Note
insertWholeStageCodegen explicitly skips physical operators with a single-attribute output schema with the type of the attribute being ObjectType type.
Note

insertWholeStageCodegen is used recursively when CollapseCodegenStages is requested for the following:

Inserting InputAdapter Unary Physical Operator — insertInputAdapter Internal Method

insertInputAdapter(plan: SparkPlan): SparkPlan

insertInputAdapter inserts an InputAdapter physical operator in a physical plan.

Caution
FIXME Examples for every case + screenshots from web UI
Note
insertInputAdapter is used exclusively when CollapseCodegenStages inserts WholeStageCodegenExec physical operator and recursively down the physical plan.

Enforcing Whole-Stage CodeGen Requirements For Physical Operators — supportCodegen Internal Predicate

supportCodegen(plan: SparkPlan): Boolean

supportCodegen is positive (true) when the input physical operator is as follows:

  1. CodegenSupport and the supportCodegen flag is turned on

    Note
    supportCodegen flag is turned on by default.
  2. No Catalyst expressions are CodegenFallback (except LeafExpressions)

  3. Output schema is neither wide not deep, i.e. uses just enough fields (including nested fields)

    Note
    spark.sql.codegen.maxFields Spark internal property defaults to 100.
  4. Children also have the output schema that is neither wide nor deep

Otherwise, supportCodegen is negative (false).

import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
// both where and select operators support codegen
// the plan tree (with the operators and expressions) meets the requirements
// That's why the plan has WholeStageCodegenExec inserted
// That you can see as stars (*) in the output of explain
val q = Seq((1,2,3)).toDF("id", "c0", "c1").where('id === 0).select('c0)
scala> q.explain
== Physical Plan ==
*Project [_2#89 AS c0#93]
+- *Filter (_1#88 = 0)
   +- LocalTableScan [_1#88, _2#89, _3#90]

// CollapseCodegenStages is only used in QueryExecution.executedPlan
// Use sparkPlan then so we avoid CollapseCodegenStages
val plan = q.queryExecution.sparkPlan
import org.apache.spark.sql.execution.ProjectExec
val pe = plan.asInstanceOf[ProjectExec]

scala> pe.supportCodegen
res1: Boolean = true

scala> pe.schema.fields.size
res2: Int = 1

scala> pe.children.map(_.schema).map(_.size).sum
res3: Int = 3
import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
// both where and select support codegen
// let's break the requirement of spark.sql.codegen.maxFields
val newSpark = spark.newSession()
import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_MAX_NUM_FIELDS
newSpark.sessionState.conf.setConf(WHOLESTAGE_MAX_NUM_FIELDS, 2)

scala> println(newSpark.sessionState.conf.wholeStageMaxNumFields)
2

import newSpark.implicits._
// the same query as above but created in SparkSession with WHOLESTAGE_MAX_NUM_FIELDS as 2
val q = Seq((1,2,3)).toDF("id", "c0", "c1").where('id === 0).select('c0)

// Note that there are no stars in the output of explain
// No WholeStageCodegenExec operator in the plan => whole-stage codegen disabled
scala> q.explain
== Physical Plan ==
Project [_2#122 AS c0#126]
+- Filter (_1#121 = 0)
   +- LocalTableScan [_1#121, _2#122, _3#123]
Note

supportCodegen is used when CollapseCodegenStages does the following:

Enforcing Whole-Stage CodeGen Requirements For Catalyst Expressions — supportCodegen Internal Predicate

supportCodegen(e: Expression): Boolean

supportCodegen is positive (true) when the input Catalyst expression is the following (in the order of verification):

Otherwise, supportCodegen is negative (false).

Note
supportCodegen (for Catalyst expressions) is used exclusively when CollapseCodegenStages physical optimization is requested to enforce whole-stage codegen requirements for a physical operator.

results matching ""

    No results matching ""