package org.apache.spark.sql.execution
trait CodegenSupport extends SparkPlan {
  // only required properties (vals and methods) that have no implementation
  // the others follow
  def doProduce(ctx: CodegenContext): String
  def inputRDDs(): Seq[RDD[InternalRow]]
  // ...except the following that throws an UnsupportedOperationException by default
  def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String
}
CodegenSupport Contract — Physical Operators with Java Code Generation
CodegenSupport is the contract of physical operators that want to support Java code generation and participate in the Whole-Stage Java Code Generation (Whole-Stage CodeGen).
| Method | Description | ||
|---|---|---|---|
  | 
Generating a plain Java source code for whole-stage "consume" path code generation Used exclusively when   | 
||
  | 
Generating a plain Java source code (as a text) for the "produce" path in whole-stage Java code generation. Used exclusively when a physical operator is requested to generate the Java source code for produce code path, i.e. a Java code that reads the rows from the input RDDs, processes them to produce output rows that are then the input rows to downstream physical operators.  | 
||
  | 
 Used when   | 
CodegenSupport has the final methods that are used to generate the Java source code in different phases of Whole-Stage Java Code Generation.
| Name | Description | 
|---|---|
Code for consuming generated columns or a row from a physical operator 
 | 
|
Code for "produce" code path 
 | 
CodegenSupport allows physical operators to disable Java code generation.
| 
 Tip 
 | 
Use debugCodegen or QueryExecution.debug.codegen methods to access the generated Java source code for a structured query. | 
variablePrefix is…FIXME
CodegenSupport uses a parent physical operator (with CodegenSupport) for…FIXME
val q = spark.range(1)
import org.apache.spark.sql.execution.debug._
scala> q.debugCodegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Range (0, 1, step=1, splits=8)
Generated code:
...
// The above is equivalent to the following method chain
scala> q.queryExecution.debug.codegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Range (0, 1, step=1, splits=8)
Generated code:
...
| CodegenSupport | Description | 
|---|---|
  | 
|
  | 
|
  | 
|
  | 
|
  | 
|
  | 
|
  | 
|
  | 
|
  | 
|
  | 
|
  | 
|
  | 
|
  | 
|
  | 
|
  | 
|
  | 
|
  | 
|
  | 
|
  | 
 supportCodegen Flag
supportCodegen: Boolean = true
supportCodegen flag is to select between InputAdapter or WholeStageCodegenExec physical operators when CollapseCodegenStages is executed (and checks whether a physical operator meets the requirements of whole-stage Java code generation or not).
supportCodegen flag is turned on by default.
| 
 Note 
 | 
 
 
  | 
 Generating Java Source Code for Produce Code Path — produce Final Method
produce(ctx: CodegenContext, parent: CodegenSupport): String
produce generates the Java source code for whole-stage-codegen produce code path for processing the rows from the input RDDs, i.e. a Java code that reads the rows from the input RDDs, processes them to produce output rows that are then the input rows to downstream physical operators.
Internally, produce prepares a physical operator for query execution and then generates a Java source code with the result of doProduce.
While generating the Java source code, produce annotates code blocks with PRODUCE markers that are simple descriptions of the physical operators in a structured query.
| 
 Tip 
 | 
Enable spark.sql.codegen.comments Spark SQL property to have PRODUCE markers in the generated Java source code.
 | 
// ./bin/spark-shell --conf spark.sql.codegen.comments=true
import org.apache.spark.sql.execution.debug._
val q = Seq((0 to 4).toList).toDF.
  select(explode('value) as "id").
  join(spark.range(1), "id")
scala> q.debugCodegen
Found 2 WholeStageCodegen subtrees.
== Subtree 1 / 2 ==
*Range (0, 1, step=1, splits=8)
...
/* 080 */   protected void processNext() throws java.io.IOException {
/* 081 */     // PRODUCE: Range (0, 1, step=1, splits=8)
/* 082 */     // initialize Range
/* 083 */     if (!range_initRange) {
...
== Subtree 2 / 2 ==
*Project [id#6]
+- *BroadcastHashJoin [cast(id#6 as bigint)], [id#9L], Inner, BuildRight
   :- Generate explode(value#1), false, false, [id#6]
   :  +- LocalTableScan [value#1]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
      +- *Range (0, 1, step=1, splits=8)
...
/* 062 */   protected void processNext() throws java.io.IOException {
/* 063 */     // PRODUCE: Project [id#6]
/* 064 */     // PRODUCE: BroadcastHashJoin [cast(id#6 as bigint)], [id#9L], Inner, BuildRight
/* 065 */     // PRODUCE: InputAdapter
/* 066 */     while (inputadapter_input.hasNext() && !stopEarly()) {
...
| 
 Note 
 | 
 
 
  | 
 prepareRowVar Internal Method
prepareRowVar(ctx: CodegenContext, row: String, colVars: Seq[ExprCode]): ExprCode
prepareRowVar…FIXME
| 
 Note 
 | 
prepareRowVar is used exclusively when CodegenSupport is requested to consume (and constructDoConsumeFunction with spark.sql.codegen.splitConsumeFuncByOperator enabled).
 | 
 constructDoConsumeFunction Internal Method
constructDoConsumeFunction(
  ctx: CodegenContext,
  inputVars: Seq[ExprCode],
  row: String): String
constructDoConsumeFunction…FIXME
| 
 Note 
 | 
constructDoConsumeFunction is used exclusively when CodegenSupport is requested to consume.
 | 
 registerComment Method
registerComment(text: => String): String
registerComment…FIXME
| 
 Note 
 | 
registerComment is used when…FIXME
 | 
 metricTerm Method
metricTerm(ctx: CodegenContext, name: String): String
metricTerm…FIXME
| 
 Note 
 | 
metricTerm is used when…FIXME
 | 
 usedInputs Method
usedInputs: AttributeSet
usedInputs returns the expression references.
| 
 Note 
 | 
Physical operators can mark it as empty to defer evaluation of attribute expressions until they are actually used (in the generated Java source code for consume path). | 
| 
 Note 
 | 
usedInputs is used exclusively when CodegenSupport is requested to generate a Java source code for consume path.
 | 
 Generating Java Source Code to Consume Generated Columns or Row From Current Physical Operator — consume Final Method
consume(ctx: CodegenContext, outputVars: Seq[ExprCode], row: String = null): String
| 
 Note 
 | 
consume is a final method that cannot be changed and is the foundation of codegen support.
 | 
consume creates the ExprCodes for the input variables (aka inputVars).
- 
If
outputVarsis defined,consumemakes sure that their number is exactly the length of the output and copies them. In other words,inputVarsis exactlyoutputVars. - 
If
outputVarsis not defined,consumemakes sure thatrowis defined.consumesets currentVars of theCodegenContexttonullwhile INPUT_ROW to therow. For every attribute in the output,consumecreates a BoundReference and requests it to generate code for expression evaluation. 
consume creates a row variable.
consume sets the following in the CodegenContext:
- 
currentVars as the
inputVars - 
INPUT_ROW as
null - 
freshNamePrefix as the variablePrefix of the parent CodegenSupport operator.
 
consume evaluateRequiredVariables (with the output, inputVars and usedInputs of the parent CodegenSupport operator) and creates so-called evaluated.
consume creates a so-called consumeFunc by constructDoConsumeFunction when the following are all met:
- 
spark.sql.codegen.splitConsumeFuncByOperator internal configuration property is enabled
 - 
usedInputs of the parent CodegenSupport operator contains all output attributes
 - 
paramLengthis correct (FIXME) 
Otherwise, consume requests the parent CodegenSupport operator to doConsume.
In the end, consume gives the plain Java source code with the comment CONSUME: [parent]:
[evaluated]
[consumeFunc]
| 
 Tip 
 | 
Enable spark.sql.codegen.comments Spark SQL property to have CONSUME markers in the generated Java source code.
 | 
// ./bin/spark-shell --conf spark.sql.codegen.comments=true
import org.apache.spark.sql.execution.debug._
val q = Seq((0 to 4).toList).toDF.
  select(explode('value) as "id").
  join(spark.range(1), "id")
scala> q.debugCodegen
Found 2 WholeStageCodegen subtrees.
...
== Subtree 2 / 2 ==
*Project [id#6]
+- *BroadcastHashJoin [cast(id#6 as bigint)], [id#9L], Inner, BuildRight
   :- Generate explode(value#1), false, false, [id#6]
   :  +- LocalTableScan [value#1]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
      +- *Range (0, 1, step=1, splits=8)
...
/* 066 */     while (inputadapter_input.hasNext() && !stopEarly()) {
/* 067 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 068 */       // CONSUME: BroadcastHashJoin [cast(id#6 as bigint)], [id#9L], Inner, BuildRight
/* 069 */       // input[0, int, false]
/* 070 */       int inputadapter_value = inputadapter_row.getInt(0);
...
/* 079 */       // find matches from HashedRelation
/* 080 */       UnsafeRow bhj_matched = bhj_isNull ? null: (UnsafeRow)bhj_relation.getValue(bhj_value);
/* 081 */       if (bhj_matched != null) {
/* 082 */         {
/* 083 */           bhj_numOutputRows.add(1);
/* 084 */
/* 085 */           // CONSUME: Project [id#6]
/* 086 */           // CONSUME: WholeStageCodegen
/* 087 */           project_rowWriter.write(0, inputadapter_value);
/* 088 */           append(project_result);
/* 089 */
/* 090 */         }
/* 091 */       }
/* 092 */       if (shouldStop()) return;
...
| 
 Note 
 | 
 
 
  | 
 parent Internal Variable Property
parent: CodegenSupport
parent starts empty, (i.e. defaults to null value) and is assigned a physical operator (with CodegenContext) only when CodegenContext is requested to generate a Java source code for produce code path. The physical operator is passed in as an input argument for the produce code path.
| 
 Note 
 | 
parent is used when…FIXME
 |