CodegenSupport — Physical Operators with Optional Java Code Generation

CodegenSupport is the contract in Spark SQL for physical operators that support Java code generation (aka codegen).

CodegenSupport allows physical operators to disable Java code generation.

Tip
Use debugCodegen (or QueryExecution.debug.codegen) method to access a CodegenSupport-generated Java source code.

variablePrefix is…​FIXME

val q = spark.range(1)

import org.apache.spark.sql.execution.debug._
scala> q.debugCodegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Range (0, 1, step=1, splits=8)

Generated code:
...

// The above is equivalent to the following method chain
scala> q.queryExecution.debug.codegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Range (0, 1, step=1, splits=8)

Generated code:
...

CodegenSupport Contract

package org.apache.spark.sql.execution

trait CodegenSupport extends SparkPlan {
  // only required methods that have no implementation
  def doProduce(ctx: CodegenContext): String
  def inputRDDs(): Seq[RDD[InternalRow]]

  // ...except the following that throws an UnsupportedOperationException by default
  def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String
}
Table 1. (Subset of) CodegenSupport Contract (in alphabetical order)
Method Description

doConsume

Used exclusively when CodegenSupport is requested for the Java code to consume the generated columns or a row from a physical operator.

doProduce

Generating plain Java source code for whole-stage "produce" path code generation

Used exclusively when a physical operator is requested to generate the Java source code for produce code path, i.e. a Java code that reads the rows from the input RDDs, processes them to produce output rows that are then the input rows to downstream physical operators.

inputRDDs

Input RDDs of a physical operator

Note
Up to two input RDDs can be supported.

Used exclusively when WholeStageCodegenExec is executed.

Generating Java Source Code to Consume Generated Columns or Row From Current Physical Operator — consume Final Method

consume(ctx: CodegenContext, outputVars: Seq[ExprCode], row: String = null): String

consume…​FIXME

Tip
Enable spark.sql.codegen.comments Spark SQL property to have CONSUME markers in the generated Java source code.
// ./bin/spark-shell --conf spark.sql.codegen.comments=true
import org.apache.spark.sql.execution.debug._
val q = Seq((0 to 4).toList).toDF.
  select(explode('value) as "id").
  join(spark.range(1), "id")
scala> q.debugCodegen
Found 2 WholeStageCodegen subtrees.
...
== Subtree 2 / 2 ==
*Project [id#6]
+- *BroadcastHashJoin [cast(id#6 as bigint)], [id#9L], Inner, BuildRight
   :- Generate explode(value#1), false, false, [id#6]
   :  +- LocalTableScan [value#1]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
      +- *Range (0, 1, step=1, splits=8)
...
/* 066 */     while (inputadapter_input.hasNext() && !stopEarly()) {
/* 067 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 068 */       // CONSUME: BroadcastHashJoin [cast(id#6 as bigint)], [id#9L], Inner, BuildRight
/* 069 */       // input[0, int, false]
/* 070 */       int inputadapter_value = inputadapter_row.getInt(0);
...
/* 079 */       // find matches from HashedRelation
/* 080 */       UnsafeRow bhj_matched = bhj_isNull ? null: (UnsafeRow)bhj_relation.getValue(bhj_value);
/* 081 */       if (bhj_matched != null) {
/* 082 */         {
/* 083 */           bhj_numOutputRows.add(1);
/* 084 */
/* 085 */           // CONSUME: Project [id#6]
/* 086 */           // CONSUME: WholeStageCodegen
/* 087 */           project_rowWriter.write(0, inputadapter_value);
/* 088 */           append(project_result);
/* 089 */
/* 090 */         }
/* 091 */       }
/* 092 */       if (shouldStop()) return;
...

supportCodegen Flag

supportCodegen: Boolean = true

supportCodegen flag is to select between InputAdapter or WholeStageCodegenExec physical operators when CollapseCodegenStages is executed (and checks whether a physical operator meets the requirements of whole-stage Java code generation or not).

supportCodegen flag is turned on by default.

Note

supportCodegen is turned off in the following physical operators:

Generating Java Source Code for Produce Code Path — produce Final Method

produce(ctx: CodegenContext, parent: CodegenSupport): String

produce generates the Java source code for whole-stage-codegen produce code path for processing the rows from the input RDDs, i.e. a Java code that reads the rows from the input RDDs, processes them to produce output rows that are then the input rows to downstream physical operators.

Internally, produce prepares a physical operator for query execution and then generates a Java source code with the result of doProduce.

While generating the Java source code, produce annotates code blocks with PRODUCE markers that are simple descriptions of the physical operators in a structured query.

Tip
Enable spark.sql.codegen.comments Spark SQL property to have PRODUCE markers in the generated Java source code.
// ./bin/spark-shell --conf spark.sql.codegen.comments=true
import org.apache.spark.sql.execution.debug._
val q = Seq((0 to 4).toList).toDF.
  select(explode('value) as "id").
  join(spark.range(1), "id")
scala> q.debugCodegen
Found 2 WholeStageCodegen subtrees.
== Subtree 1 / 2 ==
*Range (0, 1, step=1, splits=8)
...
/* 080 */   protected void processNext() throws java.io.IOException {
/* 081 */     // PRODUCE: Range (0, 1, step=1, splits=8)
/* 082 */     // initialize Range
/* 083 */     if (!range_initRange) {
...
== Subtree 2 / 2 ==
*Project [id#6]
+- *BroadcastHashJoin [cast(id#6 as bigint)], [id#9L], Inner, BuildRight
   :- Generate explode(value#1), false, false, [id#6]
   :  +- LocalTableScan [value#1]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
      +- *Range (0, 1, step=1, splits=8)
...
/* 062 */   protected void processNext() throws java.io.IOException {
/* 063 */     // PRODUCE: Project [id#6]
/* 064 */     // PRODUCE: BroadcastHashJoin [cast(id#6 as bigint)], [id#9L], Inner, BuildRight
/* 065 */     // PRODUCE: InputAdapter
/* 066 */     while (inputadapter_input.hasNext() && !stopEarly()) {
...
Note
produce is used mainly when WholeStageCodegenExec is requested to generate the Java source code for a physical plan (i.e. a physical operator and its children).

results matching ""

    No results matching ""