Whole-Stage Code Generation (aka Whole-Stage CodeGen)

Review SPARK-12795 Whole stage codegen to learn about the work to support it.

Whole-Stage Code Generation (aka WholeStageCodegen or WholeStageCodegenExec) fuses multiple operators (as a subtree of plans that support codegen) together into a single Java function that is aimed at improving execution performance. It collapses a query into a single optimized function that eliminates virtual function calls and leverages CPU registers for intermediate data.

WholeStageCodegenExec case class works with a SparkPlan to produce a codegened pipeline. It is a unary node in SparkPlan with support for codegen.

Use Dataset.explain method to know the physical plan of a query and find out whether or not WholeStageCodegen is in use.
Consider using Debugging Query Execution facility to deep dive into whole stage codegen.
scala> spark.range(10).select('id as 'asId).where('id === 4).explain
== Physical Plan ==
:  +- Project [id#0L AS asId#3L]
:     +- Filter (id#0L = 4)
:        +- Range 0, 1, 8, 10, [id#0L]

SparkPlan plans with support for codegen extend CodegenSupport.

Whole stage codegen is used by some modern massively parallel processing (MPP) databases to archive great performance. See Efficiently Compiling Efficient Query Plans for Modern Hardware (PDF).

Whole stage codegen uses spark.sql.codegen.wholeStage setting to control…​FIXME

Janino is used to compile a Java source code into a Java class.

Before a query is executed, CollapseCodegenStages case class is used to find the plans that support codegen and collapse them together as WholeStageCodegen. It is part of the sequence of rules QueryExecution.preparations that will be applied in order to the physical plan before execution.

CodegenSupport Contract

CodegenSupport allows custom implementations to optionally disable codegen using supportCodegen predicate (that is enabled by default, i.e. true).

CodegenSupport assumes that custom implementations define:

  • doProduce(ctx: CodegenContext): String

Use debugCodegen method to review the CodegenSupport-generated code.

Codegen Operators

SparkPlan plans that support codegen extend CodegenSupport.

FIXME Where is RowDataSourceScanExec used?
  • BatchedDataSourceScanExec

  • ExpandExec

  • BaseLimitExec

  • SortExec

  • WholeStageCodegenExec and InputAdapter

  • TungstenAggregate

  • BroadcastHashJoinExec

  • SortMergeJoinExec


scala> spark.range(10).sample(false, 0.4).explain
== Physical Plan ==
:  +- Sample 0.0, 0.4, false, -7634498724724501829
:     +- Range 0, 1, 8, 10, [id#15L]


scala> spark.range(10).explain
== Physical Plan ==
:  +- Range 0, 1, 8, 10, [id#20L]


CollapseCodegenStages is a Rule[SparkPlan], i.e. a transformation of SparkPlan into another SparkPlan.

It searches for sub-plans (aka stages) that support codegen and collapse them together as a WholeStageCodegen.

Only CodegenSupport SparkPlans support codegen for which supportCodegen is enabled (true).

It is assumed that all Expression instances except CodegenFallback support codegen.

CollapseCodegenStages uses the internal setting spark.sql.codegen.maxFields (default: 200) to control the number of fields in input and output schemas before deactivating whole-stage codegen. It counts the fields included in complex types, i.e. StructType, MapType, ArrayType, UserDefinedType, and their combinations, recursively. See SPARK-14554.

It inserts InputAdapter leaf nodes in a SparkPlan recursively that is then used to generate code that consumes an RDD iterator of InternalRow.

BenchmarkWholeStageCodegen — Performance Benchmark

BenchmarkWholeStageCodegen class provides a benchmark to measure whole stage codegen performance.

You can execute it using the command:

build/sbt 'sql/testOnly *BenchmarkWholeStageCodegen'
You need to un-ignore tests in BenchmarkWholeStageCodegen by replacing ignore with test.
$ build/sbt 'sql/testOnly *BenchmarkWholeStageCodegen'
Running benchmark: range/limit/sum
  Running case: range/limit/sum codegen=false
22:55:23.028 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  Running case: range/limit/sum codegen=true

Java HotSpot(TM) 64-Bit Server VM 1.8.0_77-b03 on Mac OS X 10.10.5
Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz

range/limit/sum:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
range/limit/sum codegen=false             376 /  433       1394.5           0.7       1.0X
range/limit/sum codegen=true              332 /  388       1581.3           0.6       1.1X

[info] - range/limit/sum (10 seconds, 74 milliseconds)

results matching ""

    No results matching ""