val q = spark.range(10).where('id === 4)
scala> q.queryExecution.debug.codegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*(1) Filter (id#3L = 4)
+- *(1) Range (0, 10, step=1, splits=8)
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
...
WholeStageCodegenExec Unary Physical Operator for Java Code Generation
WholeStageCodegenExec is a unary physical operator that is one of the two physical operators that lay the foundation for the Whole-Stage Java Code Generation for a Codegened Execution Pipeline of a structured query.
|
Note
|
InputAdapter is the other physical operator for Codegened Execution Pipeline of a structured query. |
WholeStageCodegenExec itself supports the Java code generation and so when executed triggers code generation for the entire child physical plan subtree of a structured query.
|
Tip
|
Consider using Debugging Query Execution facility to deep dive into the whole-stage code generation. |
val q = spark.range(10).where('id === 4)
import org.apache.spark.sql.execution.debug._
scala> q.debugCodegen()
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*(1) Filter (id#0L = 4)
+- *(1) Range (0, 10, step=1, splits=8)
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
...
|
Tip
|
Use the following to enable comments in generated code.
|
val q = spark.range(10).where('id === 4)
import org.apache.spark.sql.execution.debug._
scala> q.debugCodegen()
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*(1) Filter (id#6L = 4)
+- *(1) Range (0, 10, step=1, splits=8)
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ /**
* Codegend pipeline for stage (id=1)
* *(1) Filter (id#6L = 4)
* +- *(1) Range (0, 10, step=1, splits=8)
*/
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
...
WholeStageCodegenExec is created when:
-
CollapseCodegenStagesphysical query optimization is executed (with spark.sql.codegen.wholeStage configuration property enabled) -
FileSourceScanExecleaf physical operator is executed (with the supportsBatch flag enabled) -
InMemoryTableScanExecleaf physical operator is executed (with the supportsBatch flag enabled) -
DataSourceV2ScanExecleaf physical operator is executed (with the supportsBatch flag enabled)
|
Note
|
spark.sql.codegen.wholeStage property is enabled by default. |
WholeStageCodegenExec takes a single child physical operator (a physical subquery tree) and codegen stage ID when created.
|
Note
|
WholeStageCodegenExec requires that the single child physical operator supports Java code generation.
|
// RangeExec physical operator does support codegen
import org.apache.spark.sql.execution.RangeExec
import org.apache.spark.sql.catalyst.plans.logical.Range
val rangeExec = RangeExec(Range(start = 0, end = 1, step = 1, numSlices = 1))
import org.apache.spark.sql.execution.WholeStageCodegenExec
val rdd = WholeStageCodegenExec(rangeExec)(codegenStageId = 0).execute()
WholeStageCodegenExec marks the child physical operator with * (star) prefix and per-query codegen stage ID (in round brackets) in the text representation of a physical plan tree.
scala> println(plan.numberedTreeString)
00 *(1) Project [id#117L]
01 +- *(1) BroadcastHashJoin [id#117L], [cast(id#115 as bigint)], Inner, BuildRight
02 :- *(1) Range (0, 1, step=1, splits=8)
03 +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
04 +- Generate explode(ids#112), false, [id#115]
05 +- LocalTableScan [ids#112]
|
Note
|
As WholeStageCodegenExec is created as a result of CollapseCodegenStages physical query optimization rule, it is only executed in executedPlan phase of a query execution (that you can only notice by the * star prefix in a plan output).
|
val q = spark.range(9)
// we need executedPlan with WholeStageCodegenExec physical operator "injected"
val plan = q.queryExecution.executedPlan
// Note the star prefix of Range that marks WholeStageCodegenExec
// As a matter of fact, there are two physical operators in play here
// i.e. WholeStageCodegenExec with Range as the child
scala> println(plan.numberedTreeString)
00 *Range (0, 9, step=1, splits=8)
// Let's unwrap Range physical operator
// and access the parent WholeStageCodegenExec
import org.apache.spark.sql.execution.WholeStageCodegenExec
val wsce = plan.asInstanceOf[WholeStageCodegenExec]
// Trigger code generation of the entire query plan tree
val (ctx, code) = wsce.doCodeGen
// CodeFormatter can pretty-print the code
import org.apache.spark.sql.catalyst.expressions.codegen.CodeFormatter
scala> println(CodeFormatter.format(code))
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ /**
* Codegend pipeline for
* Range (0, 9, step=1, splits=8)
*/
/* 006 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
...
When executed, WholeStageCodegenExec gives pipelineTime performance metric.
| Key | Name (in web UI) | Description |
|---|---|---|
(empty) |
Time of how long the whole-stage codegend pipeline has been running (i.e. the elapsed time since the underlying BufferedRowIterator had been created and the internal rows were all consumed). |
|
Tip
|
Use explain operator to know the physical plan of a query and find out whether or not WholeStageCodegen is in use.
|
val q = spark.range(10).where('id === 4)
// Note the stars in the output that are for codegened operators
scala> q.explain
== Physical Plan ==
*Filter (id#0L = 4)
+- *Range (0, 10, step=1, splits=8)
|
Note
|
Physical plans that support code generation extend CodegenSupport. |
|
Tip
|
Enable Add the following line to
Refer to Logging. |
Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method
doExecute(): RDD[InternalRow]
|
Note
|
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).
|
doExecute generates the Java source code for the child physical plan subtree first and uses CodeGenerator to compile it right afterwards.
If compilation goes well, doExecute branches off per the number of input RDDs.
|
Note
|
doExecute only supports up to two input RDDs.
|
|
Caution
|
FIXME Finish the "success" path |
If the size of the generated codes is greater than spark.sql.codegen.hugeMethodLimit (which defaults to 65535), doExecute prints out the following INFO message:
Found too long generated codes and JIT optimization might not work: the bytecode size ([maxCodeSize]) is above the limit [spark.sql.codegen.hugeMethodLimit], and the whole-stage codegen was disabled for this plan (id=[codegenStageId]). To avoid this, you can raise the limit `spark.sql.codegen.hugeMethodLimit`:
[treeString]
In the end, doExecute requests the child physical operator to execute (that triggers physical query planning and generates an RDD[InternalRow]) and returns it.
|
Note
|
doExecute skips requesting the child physical operator to execute for FileSourceScanExec leaf physical operator with supportsBatch flag enabled (as FileSourceScanExec operator uses WholeStageCodegenExec operator when FileSourceScanExec).
|
If compilation fails and spark.sql.codegen.fallback configuration property is enabled, doExecute prints out the following WARN message to the logs, requests the child physical operator to execute and returns it.
Whole-stage codegen disabled for plan (id=[codegenStageId]):
[treeString]
Generating Java Source Code for Child Physical Plan Subtree — doCodeGen Method
doCodeGen(): (CodegenContext, CodeAndComment)
doCodeGen creates a new CodegenContext and requests the single child physical operator to generate a Java source code for produce code path (with the new CodegenContext and the WholeStageCodegenExec physical operator itself).
doCodeGen adds the new function under the name of processNext.
doCodeGen generates the class name.
doCodeGen generates the final Java source code of the following format:
public Object generate(Object[] references) {
return new [className](references);
}
/**
* Codegend pipeline for stage (id=[codegenStageId])
* [treeString]
*/
final class [className] extends BufferedRowIterator {
private Object[] references;
private scala.collection.Iterator[] inputs;
// ctx.declareMutableStates()
public [className](Object[] references) {
this.references = references;
}
public void init(int index, scala.collection.Iterator[] inputs) {
partitionIndex = index;
this.inputs = inputs;
// ctx.initMutableStates()
// ctx.initPartition()
}
// ctx.emitExtraCode()
// ctx.declareAddedFunctions()
}
|
Note
|
doCodeGen requires that the single child physical operator supports Java code generation.
|
doCodeGen cleans up the generated code (using CodeFormatter to stripExtraNewLines, stripOverlappingComments).
doCodeGen prints out the following DEBUG message to the logs:
DEBUG WholeStageCodegenExec:
[cleanedSource]
In the end, doCodeGen returns the CodegenContext and the Java source code (as a CodeAndComment).
|
Note
|
|
Generating Java Source Code for Consume Path in Whole-Stage Code Generation — doConsume Method
doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String
|
Note
|
doConsume is part of CodegenSupport Contract to generate the Java source code for consume path in Whole-Stage Code Generation.
|
doConsume generates a Java source code that:
-
Takes (from the input
row) the code to evaluate a Catalyst expression on an inputInternalRow -
Takes (from the input
row) the term for a value of the result of the evaluation-
Adds
.copy()to the term if needCopyResult is turned on
-
-
Wraps the term inside
append()code block
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
val ctx = new CodegenContext()
import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode
val exprCode = ExprCode(code = "my_code", isNull = "false", value = "my_value")
// wsce defined above, i.e at the top of the page
val consumeCode = wsce.doConsume(ctx, input = Seq(), row = exprCode)
scala> println(consumeCode)
my_code
append(my_value);
Generating Class Name — generatedClassName Method
generatedClassName(): String
generatedClassName gives a class name per spark.sql.codegen.useIdInClassName configuration property:
-
GeneratedIteratorForCodegenStagewith the codegen stage ID when enabled (true) -
GeneratedIteratorwhen disabled (false)
|
Note
|
generatedClassName is used exclusively when WholeStageCodegenExec unary physical operator is requested to generate the Java source code for the child physical plan subtree.
|