CodegenSupport Contract — Physical Operators with Java Code Generation

CodegenSupport is the contract of physical operators that want to support Java code generation and participate in the Whole-Stage Java Code Generation (Whole-Stage CodeGen).

package org.apache.spark.sql.execution

trait CodegenSupport extends SparkPlan {
  // only required properties (vals and methods) that have no implementation
  // the others follow
  def doProduce(ctx: CodegenContext): String
  def inputRDDs(): Seq[RDD[InternalRow]]

  // ...except the following that throws an UnsupportedOperationException by default
  def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String
}
Table 1. (Subset of) CodegenSupport Contract
Method Description

doConsume

Generating a plain Java source code for whole-stage "consume" path code generation

Used exclusively when CodegenSupport is requested for the Java source code to consume the generated columns or a row from a physical operator.

doProduce

Generating a plain Java source code (as a text) for the "produce" path in whole-stage Java code generation.

Used exclusively when a physical operator is requested to generate the Java source code for produce code path, i.e. a Java code that reads the rows from the input RDDs, processes them to produce output rows that are then the input rows to downstream physical operators.

inputRDDs

Input RDDs of a physical operator

Note
Up to two input RDDs are supported only.

Used when WholeStageCodegenExec unary physical operator is executed.

CodegenSupport has the final methods that are used to generate the Java source code in different phases of Whole-Stage Java Code Generation.

Table 2. SparkPlan’s Final Methods
Name Description

consume

Code for consuming generated columns or a row from a physical operator

consume(ctx: CodegenContext, outputVars: Seq[ExprCode], row: String = null): String

produce

Code for "produce" code path

produce(ctx: CodegenContext, parent: CodegenSupport): String

CodegenSupport allows physical operators to disable Java code generation.

Tip
Use debugCodegen or QueryExecution.debug.codegen methods to access the generated Java source code for a structured query.

variablePrefix is…​FIXME

CodegenSupport uses a parent physical operator (with CodegenSupport) for…​FIXME

val q = spark.range(1)

import org.apache.spark.sql.execution.debug._
scala> q.debugCodegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Range (0, 1, step=1, splits=8)

Generated code:
...

// The above is equivalent to the following method chain
scala> q.queryExecution.debug.codegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Range (0, 1, step=1, splits=8)

Generated code:
...
Table 3. CodegenSupports
CodegenSupport Description

BaseLimitExec

BroadcastHashJoinExec

ColumnarBatchScan

DataSourceScanExec

DebugExec

DeserializeToObjectExec

ExpandExec

FilterExec

GenerateExec

HashAggregateExec

InputAdapter

MapElementsExec

ProjectExec

RangeExec

SampleExec

SerializeFromObjectExec

SortExec

SortMergeJoinExec

WholeStageCodegenExec

supportCodegen Flag

supportCodegen: Boolean = true

supportCodegen flag is to select between InputAdapter or WholeStageCodegenExec physical operators when CollapseCodegenStages is executed (and checks whether a physical operator meets the requirements of whole-stage Java code generation or not).

supportCodegen flag is turned on by default.

Note

supportCodegen is turned off in the following physical operators:

Generating Java Source Code for Produce Code Path — produce Final Method

produce(ctx: CodegenContext, parent: CodegenSupport): String

produce generates the Java source code for whole-stage-codegen produce code path for processing the rows from the input RDDs, i.e. a Java code that reads the rows from the input RDDs, processes them to produce output rows that are then the input rows to downstream physical operators.

Internally, produce prepares a physical operator for query execution and then generates a Java source code with the result of doProduce.

While generating the Java source code, produce annotates code blocks with PRODUCE markers that are simple descriptions of the physical operators in a structured query.

Tip
Enable spark.sql.codegen.comments Spark SQL property to have PRODUCE markers in the generated Java source code.
// ./bin/spark-shell --conf spark.sql.codegen.comments=true
import org.apache.spark.sql.execution.debug._
val q = Seq((0 to 4).toList).toDF.
  select(explode('value) as "id").
  join(spark.range(1), "id")
scala> q.debugCodegen
Found 2 WholeStageCodegen subtrees.
== Subtree 1 / 2 ==
*Range (0, 1, step=1, splits=8)
...
/* 080 */   protected void processNext() throws java.io.IOException {
/* 081 */     // PRODUCE: Range (0, 1, step=1, splits=8)
/* 082 */     // initialize Range
/* 083 */     if (!range_initRange) {
...
== Subtree 2 / 2 ==
*Project [id#6]
+- *BroadcastHashJoin [cast(id#6 as bigint)], [id#9L], Inner, BuildRight
   :- Generate explode(value#1), false, false, [id#6]
   :  +- LocalTableScan [value#1]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
      +- *Range (0, 1, step=1, splits=8)
...
/* 062 */   protected void processNext() throws java.io.IOException {
/* 063 */     // PRODUCE: Project [id#6]
/* 064 */     // PRODUCE: BroadcastHashJoin [cast(id#6 as bigint)], [id#9L], Inner, BuildRight
/* 065 */     // PRODUCE: InputAdapter
/* 066 */     while (inputadapter_input.hasNext() && !stopEarly()) {
...
Note

produce is used when:

prepareRowVar Internal Method

prepareRowVar(ctx: CodegenContext, row: String, colVars: Seq[ExprCode]): ExprCode

prepareRowVar…​FIXME

Note
prepareRowVar is used exclusively when CodegenSupport is requested to consume (and constructDoConsumeFunction with spark.sql.codegen.splitConsumeFuncByOperator enabled).

constructDoConsumeFunction Internal Method

constructDoConsumeFunction(
  ctx: CodegenContext,
  inputVars: Seq[ExprCode],
  row: String): String

constructDoConsumeFunction…​FIXME

Note
constructDoConsumeFunction is used exclusively when CodegenSupport is requested to consume.

registerComment Method

registerComment(text: => String): String

registerComment…​FIXME

Note
registerComment is used when…​FIXME

metricTerm Method

metricTerm(ctx: CodegenContext, name: String): String

metricTerm…​FIXME

Note
metricTerm is used when…​FIXME

usedInputs Method

usedInputs: AttributeSet

usedInputs returns the expression references.

Note
Physical operators can mark it as empty to defer evaluation of attribute expressions until they are actually used (in the generated Java source code for consume path).
Note
usedInputs is used exclusively when CodegenSupport is requested to generate a Java source code for consume path.

Generating Java Source Code to Consume Generated Columns or Row From Current Physical Operator — consume Final Method

consume(ctx: CodegenContext, outputVars: Seq[ExprCode], row: String = null): String
Note
consume is a final method that cannot be changed and is the foundation of codegen support.

consume creates the ExprCodes for the input variables (aka inputVars).

  • If outputVars is defined, consume makes sure that their number is exactly the length of the output and copies them. In other words, inputVars is exactly outputVars.

  • If outputVars is not defined, consume makes sure that row is defined. consume sets currentVars of the CodegenContext to null while INPUT_ROW to the row. For every attribute in the output, consume creates a BoundReference and requests it to generate code for expression evaluation.

consume sets the following in the CodegenContext:

consume evaluateRequiredVariables (with the output, inputVars and usedInputs of the parent CodegenSupport operator) and creates so-called evaluated.

consume creates a so-called consumeFunc by constructDoConsumeFunction when the following are all met:

  1. spark.sql.codegen.splitConsumeFuncByOperator internal configuration property is enabled

  2. usedInputs of the parent CodegenSupport operator contains all output attributes

  3. paramLength is correct (FIXME)

Otherwise, consume requests the parent CodegenSupport operator to doConsume.

In the end, consume gives the plain Java source code with the comment CONSUME: [parent]:

[evaluated]
[consumeFunc]
Tip
Enable spark.sql.codegen.comments Spark SQL property to have CONSUME markers in the generated Java source code.
// ./bin/spark-shell --conf spark.sql.codegen.comments=true
import org.apache.spark.sql.execution.debug._
val q = Seq((0 to 4).toList).toDF.
  select(explode('value) as "id").
  join(spark.range(1), "id")
scala> q.debugCodegen
Found 2 WholeStageCodegen subtrees.
...
== Subtree 2 / 2 ==
*Project [id#6]
+- *BroadcastHashJoin [cast(id#6 as bigint)], [id#9L], Inner, BuildRight
   :- Generate explode(value#1), false, false, [id#6]
   :  +- LocalTableScan [value#1]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
      +- *Range (0, 1, step=1, splits=8)
...
/* 066 */     while (inputadapter_input.hasNext() && !stopEarly()) {
/* 067 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 068 */       // CONSUME: BroadcastHashJoin [cast(id#6 as bigint)], [id#9L], Inner, BuildRight
/* 069 */       // input[0, int, false]
/* 070 */       int inputadapter_value = inputadapter_row.getInt(0);
...
/* 079 */       // find matches from HashedRelation
/* 080 */       UnsafeRow bhj_matched = bhj_isNull ? null: (UnsafeRow)bhj_relation.getValue(bhj_value);
/* 081 */       if (bhj_matched != null) {
/* 082 */         {
/* 083 */           bhj_numOutputRows.add(1);
/* 084 */
/* 085 */           // CONSUME: Project [id#6]
/* 086 */           // CONSUME: WholeStageCodegen
/* 087 */           project_rowWriter.write(0, inputadapter_value);
/* 088 */           append(project_result);
/* 089 */
/* 090 */         }
/* 091 */       }
/* 092 */       if (shouldStop()) return;
...
Note

consume is used when:

parent Internal Variable Property

parent: CodegenSupport

parent starts empty, (i.e. defaults to null value) and is assigned a physical operator (with CodegenContext) only when CodegenContext is requested to generate a Java source code for produce code path. The physical operator is passed in as an input argument for the produce code path.

Note
parent is used when…​FIXME

results matching ""

    No results matching ""