val q = spark.range(10).where('id === 4)
scala> q.queryExecution.debug.codegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*(1) Filter (id#3L = 4)
+- *(1) Range (0, 10, step=1, splits=8)
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
...
WholeStageCodegenExec Unary Physical Operator for Java Code Generation
WholeStageCodegenExec
is a unary physical operator that is one of the two physical operators that lay the foundation for the Whole-Stage Java Code Generation for a Codegened Execution Pipeline of a structured query.
Note
|
InputAdapter is the other physical operator for Codegened Execution Pipeline of a structured query. |
WholeStageCodegenExec
itself supports the Java code generation and so when executed triggers code generation for the entire child physical plan subtree of a structured query.
Tip
|
Consider using Debugging Query Execution facility to deep dive into the whole-stage code generation. |
val q = spark.range(10).where('id === 4)
import org.apache.spark.sql.execution.debug._
scala> q.debugCodegen()
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*(1) Filter (id#0L = 4)
+- *(1) Range (0, 10, step=1, splits=8)
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
...
Tip
|
Use the following to enable comments in generated code.
|
val q = spark.range(10).where('id === 4)
import org.apache.spark.sql.execution.debug._
scala> q.debugCodegen()
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*(1) Filter (id#6L = 4)
+- *(1) Range (0, 10, step=1, splits=8)
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ /**
* Codegend pipeline for stage (id=1)
* *(1) Filter (id#6L = 4)
* +- *(1) Range (0, 10, step=1, splits=8)
*/
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
...
WholeStageCodegenExec
is created when:
-
CollapseCodegenStages
physical query optimization is executed (with spark.sql.codegen.wholeStage configuration property enabled) -
FileSourceScanExec
leaf physical operator is executed (with the supportsBatch flag enabled) -
InMemoryTableScanExec
leaf physical operator is executed (with the supportsBatch flag enabled) -
DataSourceV2ScanExec
leaf physical operator is executed (with the supportsBatch flag enabled)
Note
|
spark.sql.codegen.wholeStage property is enabled by default. |
WholeStageCodegenExec
takes a single child
physical operator (a physical subquery tree) and codegen stage ID when created.
Note
|
WholeStageCodegenExec requires that the single child physical operator supports Java code generation.
|
// RangeExec physical operator does support codegen
import org.apache.spark.sql.execution.RangeExec
import org.apache.spark.sql.catalyst.plans.logical.Range
val rangeExec = RangeExec(Range(start = 0, end = 1, step = 1, numSlices = 1))
import org.apache.spark.sql.execution.WholeStageCodegenExec
val rdd = WholeStageCodegenExec(rangeExec)(codegenStageId = 0).execute()
WholeStageCodegenExec
marks the child physical operator with *
(star) prefix and per-query codegen stage ID (in round brackets) in the text representation of a physical plan tree.
scala> println(plan.numberedTreeString)
00 *(1) Project [id#117L]
01 +- *(1) BroadcastHashJoin [id#117L], [cast(id#115 as bigint)], Inner, BuildRight
02 :- *(1) Range (0, 1, step=1, splits=8)
03 +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
04 +- Generate explode(ids#112), false, [id#115]
05 +- LocalTableScan [ids#112]
Note
|
As WholeStageCodegenExec is created as a result of CollapseCodegenStages physical query optimization rule, it is only executed in executedPlan phase of a query execution (that you can only notice by the * star prefix in a plan output).
|
val q = spark.range(9)
// we need executedPlan with WholeStageCodegenExec physical operator "injected"
val plan = q.queryExecution.executedPlan
// Note the star prefix of Range that marks WholeStageCodegenExec
// As a matter of fact, there are two physical operators in play here
// i.e. WholeStageCodegenExec with Range as the child
scala> println(plan.numberedTreeString)
00 *Range (0, 9, step=1, splits=8)
// Let's unwrap Range physical operator
// and access the parent WholeStageCodegenExec
import org.apache.spark.sql.execution.WholeStageCodegenExec
val wsce = plan.asInstanceOf[WholeStageCodegenExec]
// Trigger code generation of the entire query plan tree
val (ctx, code) = wsce.doCodeGen
// CodeFormatter can pretty-print the code
import org.apache.spark.sql.catalyst.expressions.codegen.CodeFormatter
scala> println(CodeFormatter.format(code))
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ /**
* Codegend pipeline for
* Range (0, 9, step=1, splits=8)
*/
/* 006 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
...
When executed, WholeStageCodegenExec
gives pipelineTime performance metric.
Key | Name (in web UI) | Description |
---|---|---|
(empty) |
Time of how long the whole-stage codegend pipeline has been running (i.e. the elapsed time since the underlying BufferedRowIterator had been created and the internal rows were all consumed). |
Tip
|
Use explain operator to know the physical plan of a query and find out whether or not WholeStageCodegen is in use.
|
val q = spark.range(10).where('id === 4)
// Note the stars in the output that are for codegened operators
scala> q.explain
== Physical Plan ==
*Filter (id#0L = 4)
+- *Range (0, 10, step=1, splits=8)
Note
|
Physical plans that support code generation extend CodegenSupport. |
Tip
|
Enable Add the following line to
Refer to Logging. |
Executing Physical Operator (Generating RDD[InternalRow]) — doExecute
Method
doExecute(): RDD[InternalRow]
Note
|
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow] ).
|
doExecute
generates the Java source code for the child physical plan subtree first and uses CodeGenerator
to compile it right afterwards.
If compilation goes well, doExecute
branches off per the number of input RDDs.
Note
|
doExecute only supports up to two input RDDs.
|
Caution
|
FIXME Finish the "success" path |
If the size of the generated codes is greater than spark.sql.codegen.hugeMethodLimit (which defaults to 65535
), doExecute
prints out the following INFO message:
Found too long generated codes and JIT optimization might not work: the bytecode size ([maxCodeSize]) is above the limit [spark.sql.codegen.hugeMethodLimit], and the whole-stage codegen was disabled for this plan (id=[codegenStageId]). To avoid this, you can raise the limit `spark.sql.codegen.hugeMethodLimit`:
[treeString]
In the end, doExecute
requests the child physical operator to execute (that triggers physical query planning and generates an RDD[InternalRow]
) and returns it.
Note
|
doExecute skips requesting the child physical operator to execute for FileSourceScanExec leaf physical operator with supportsBatch flag enabled (as FileSourceScanExec operator uses WholeStageCodegenExec operator when FileSourceScanExec).
|
If compilation fails and spark.sql.codegen.fallback configuration property is enabled, doExecute
prints out the following WARN message to the logs, requests the child physical operator to execute and returns it.
Whole-stage codegen disabled for plan (id=[codegenStageId]):
[treeString]
Generating Java Source Code for Child Physical Plan Subtree — doCodeGen
Method
doCodeGen(): (CodegenContext, CodeAndComment)
doCodeGen
creates a new CodegenContext and requests the single child physical operator to generate a Java source code for produce code path (with the new CodegenContext
and the WholeStageCodegenExec
physical operator itself).
doCodeGen
adds the new function under the name of processNext
.
doCodeGen
generates the class name.
doCodeGen
generates the final Java source code of the following format:
public Object generate(Object[] references) {
return new [className](references);
}
/**
* Codegend pipeline for stage (id=[codegenStageId])
* [treeString]
*/
final class [className] extends BufferedRowIterator {
private Object[] references;
private scala.collection.Iterator[] inputs;
// ctx.declareMutableStates()
public [className](Object[] references) {
this.references = references;
}
public void init(int index, scala.collection.Iterator[] inputs) {
partitionIndex = index;
this.inputs = inputs;
// ctx.initMutableStates()
// ctx.initPartition()
}
// ctx.emitExtraCode()
// ctx.declareAddedFunctions()
}
Note
|
doCodeGen requires that the single child physical operator supports Java code generation.
|
doCodeGen
cleans up the generated code (using CodeFormatter
to stripExtraNewLines
, stripOverlappingComments
).
doCodeGen
prints out the following DEBUG message to the logs:
DEBUG WholeStageCodegenExec:
[cleanedSource]
In the end, doCodeGen
returns the CodegenContext and the Java source code (as a CodeAndComment
).
Note
|
|
Generating Java Source Code for Consume Path in Whole-Stage Code Generation — doConsume
Method
doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String
Note
|
doConsume is part of CodegenSupport Contract to generate the Java source code for consume path in Whole-Stage Code Generation.
|
doConsume
generates a Java source code that:
-
Takes (from the input
row
) the code to evaluate a Catalyst expression on an inputInternalRow
-
Takes (from the input
row
) the term for a value of the result of the evaluation-
Adds
.copy()
to the term if needCopyResult is turned on
-
-
Wraps the term inside
append()
code block
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
val ctx = new CodegenContext()
import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode
val exprCode = ExprCode(code = "my_code", isNull = "false", value = "my_value")
// wsce defined above, i.e at the top of the page
val consumeCode = wsce.doConsume(ctx, input = Seq(), row = exprCode)
scala> println(consumeCode)
my_code
append(my_value);
Generating Class Name — generatedClassName
Method
generatedClassName(): String
generatedClassName
gives a class name per spark.sql.codegen.useIdInClassName configuration property:
-
GeneratedIteratorForCodegenStage
with the codegen stage ID when enabled (true
) -
GeneratedIterator
when disabled (false
)
Note
|
generatedClassName is used exclusively when WholeStageCodegenExec unary physical operator is requested to generate the Java source code for the child physical plan subtree.
|