package org.apache.spark.sql.execution
trait CodegenSupport extends SparkPlan {
// only required properties (vals and methods) that have no implementation
// the others follow
def doProduce(ctx: CodegenContext): String
def inputRDDs(): Seq[RDD[InternalRow]]
// ...except the following that throws an UnsupportedOperationException by default
def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String
}
CodegenSupport Contract — Physical Operators with Java Code Generation
CodegenSupport
is the contract of physical operators that want to support Java code generation and participate in the Whole-Stage Java Code Generation (Whole-Stage CodeGen).
Method | Description | ||
---|---|---|---|
|
Generating a plain Java source code for whole-stage "consume" path code generation Used exclusively when |
||
|
Generating a plain Java source code (as a text) for the "produce" path in whole-stage Java code generation. Used exclusively when a physical operator is requested to generate the Java source code for produce code path, i.e. a Java code that reads the rows from the input RDDs, processes them to produce output rows that are then the input rows to downstream physical operators. |
||
|
Used when |
CodegenSupport
has the final methods that are used to generate the Java source code in different phases of Whole-Stage Java Code Generation.
Name | Description |
---|---|
Code for consuming generated columns or a row from a physical operator
|
|
Code for "produce" code path
|
CodegenSupport
allows physical operators to disable Java code generation.
Tip
|
Use debugCodegen or QueryExecution.debug.codegen methods to access the generated Java source code for a structured query. |
variablePrefix
is…FIXME
CodegenSupport
uses a parent physical operator (with CodegenSupport
) for…FIXME
val q = spark.range(1)
import org.apache.spark.sql.execution.debug._
scala> q.debugCodegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Range (0, 1, step=1, splits=8)
Generated code:
...
// The above is equivalent to the following method chain
scala> q.queryExecution.debug.codegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Range (0, 1, step=1, splits=8)
Generated code:
...
CodegenSupport | Description |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
supportCodegen
Flag
supportCodegen: Boolean = true
supportCodegen
flag is to select between InputAdapter
or WholeStageCodegenExec
physical operators when CollapseCodegenStages
is executed (and checks whether a physical operator meets the requirements of whole-stage Java code generation or not).
supportCodegen
flag is turned on by default.
Note
|
|
Generating Java Source Code for Produce Code Path — produce
Final Method
produce(ctx: CodegenContext, parent: CodegenSupport): String
produce
generates the Java source code for whole-stage-codegen produce code path for processing the rows from the input RDDs, i.e. a Java code that reads the rows from the input RDDs, processes them to produce output rows that are then the input rows to downstream physical operators.
Internally, produce
prepares a physical operator for query execution and then generates a Java source code with the result of doProduce.
While generating the Java source code, produce
annotates code blocks with PRODUCE
markers that are simple descriptions of the physical operators in a structured query.
Tip
|
Enable spark.sql.codegen.comments Spark SQL property to have PRODUCE markers in the generated Java source code.
|
// ./bin/spark-shell --conf spark.sql.codegen.comments=true
import org.apache.spark.sql.execution.debug._
val q = Seq((0 to 4).toList).toDF.
select(explode('value) as "id").
join(spark.range(1), "id")
scala> q.debugCodegen
Found 2 WholeStageCodegen subtrees.
== Subtree 1 / 2 ==
*Range (0, 1, step=1, splits=8)
...
/* 080 */ protected void processNext() throws java.io.IOException {
/* 081 */ // PRODUCE: Range (0, 1, step=1, splits=8)
/* 082 */ // initialize Range
/* 083 */ if (!range_initRange) {
...
== Subtree 2 / 2 ==
*Project [id#6]
+- *BroadcastHashJoin [cast(id#6 as bigint)], [id#9L], Inner, BuildRight
:- Generate explode(value#1), false, false, [id#6]
: +- LocalTableScan [value#1]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 1, step=1, splits=8)
...
/* 062 */ protected void processNext() throws java.io.IOException {
/* 063 */ // PRODUCE: Project [id#6]
/* 064 */ // PRODUCE: BroadcastHashJoin [cast(id#6 as bigint)], [id#9L], Inner, BuildRight
/* 065 */ // PRODUCE: InputAdapter
/* 066 */ while (inputadapter_input.hasNext() && !stopEarly()) {
...
Note
|
|
prepareRowVar
Internal Method
prepareRowVar(ctx: CodegenContext, row: String, colVars: Seq[ExprCode]): ExprCode
prepareRowVar
…FIXME
Note
|
prepareRowVar is used exclusively when CodegenSupport is requested to consume (and constructDoConsumeFunction with spark.sql.codegen.splitConsumeFuncByOperator enabled).
|
constructDoConsumeFunction
Internal Method
constructDoConsumeFunction(
ctx: CodegenContext,
inputVars: Seq[ExprCode],
row: String): String
constructDoConsumeFunction
…FIXME
Note
|
constructDoConsumeFunction is used exclusively when CodegenSupport is requested to consume.
|
registerComment
Method
registerComment(text: => String): String
registerComment
…FIXME
Note
|
registerComment is used when…FIXME
|
metricTerm
Method
metricTerm(ctx: CodegenContext, name: String): String
metricTerm
…FIXME
Note
|
metricTerm is used when…FIXME
|
usedInputs
Method
usedInputs: AttributeSet
usedInputs
returns the expression references.
Note
|
Physical operators can mark it as empty to defer evaluation of attribute expressions until they are actually used (in the generated Java source code for consume path). |
Note
|
usedInputs is used exclusively when CodegenSupport is requested to generate a Java source code for consume path.
|
Generating Java Source Code to Consume Generated Columns or Row From Current Physical Operator — consume
Final Method
consume(ctx: CodegenContext, outputVars: Seq[ExprCode], row: String = null): String
Note
|
consume is a final method that cannot be changed and is the foundation of codegen support.
|
consume
creates the ExprCodes
for the input variables (aka inputVars
).
-
If
outputVars
is defined,consume
makes sure that their number is exactly the length of the output and copies them. In other words,inputVars
is exactlyoutputVars
. -
If
outputVars
is not defined,consume
makes sure thatrow
is defined.consume
sets currentVars of theCodegenContext
tonull
while INPUT_ROW to therow
. For every attribute in the output,consume
creates a BoundReference and requests it to generate code for expression evaluation.
consume
creates a row variable.
consume
sets the following in the CodegenContext
:
-
currentVars as the
inputVars
-
INPUT_ROW as
null
-
freshNamePrefix as the variablePrefix of the parent CodegenSupport operator.
consume
evaluateRequiredVariables (with the output
, inputVars
and usedInputs of the parent CodegenSupport operator) and creates so-called evaluated
.
consume
creates a so-called consumeFunc
by constructDoConsumeFunction when the following are all met:
-
spark.sql.codegen.splitConsumeFuncByOperator internal configuration property is enabled
-
usedInputs of the parent CodegenSupport operator contains all output attributes
-
paramLength
is correct (FIXME)
Otherwise, consume
requests the parent CodegenSupport operator to doConsume.
In the end, consume
gives the plain Java source code with the comment CONSUME: [parent]
:
[evaluated]
[consumeFunc]
Tip
|
Enable spark.sql.codegen.comments Spark SQL property to have CONSUME markers in the generated Java source code.
|
// ./bin/spark-shell --conf spark.sql.codegen.comments=true
import org.apache.spark.sql.execution.debug._
val q = Seq((0 to 4).toList).toDF.
select(explode('value) as "id").
join(spark.range(1), "id")
scala> q.debugCodegen
Found 2 WholeStageCodegen subtrees.
...
== Subtree 2 / 2 ==
*Project [id#6]
+- *BroadcastHashJoin [cast(id#6 as bigint)], [id#9L], Inner, BuildRight
:- Generate explode(value#1), false, false, [id#6]
: +- LocalTableScan [value#1]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 1, step=1, splits=8)
...
/* 066 */ while (inputadapter_input.hasNext() && !stopEarly()) {
/* 067 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 068 */ // CONSUME: BroadcastHashJoin [cast(id#6 as bigint)], [id#9L], Inner, BuildRight
/* 069 */ // input[0, int, false]
/* 070 */ int inputadapter_value = inputadapter_row.getInt(0);
...
/* 079 */ // find matches from HashedRelation
/* 080 */ UnsafeRow bhj_matched = bhj_isNull ? null: (UnsafeRow)bhj_relation.getValue(bhj_value);
/* 081 */ if (bhj_matched != null) {
/* 082 */ {
/* 083 */ bhj_numOutputRows.add(1);
/* 084 */
/* 085 */ // CONSUME: Project [id#6]
/* 086 */ // CONSUME: WholeStageCodegen
/* 087 */ project_rowWriter.write(0, inputadapter_value);
/* 088 */ append(project_result);
/* 089 */
/* 090 */ }
/* 091 */ }
/* 092 */ if (shouldStop()) return;
...
Note
|
|
parent
Internal Variable Property
parent: CodegenSupport
parent
starts empty, (i.e. defaults to null
value) and is assigned a physical operator (with CodegenContext
) only when CodegenContext
is requested to generate a Java source code for produce code path. The physical operator is passed in as an input argument for the produce code path.
Note
|
parent is used when…FIXME
|