WholeStageCodegenExec Unary Physical Operator for Java Code Generation

WholeStageCodegenExec is a unary physical operator that is one of the two physical operators that lay the foundation for the Whole-Stage Java Code Generation for a Codegened Execution Pipeline of a structured query.

Note
InputAdapter is the other physical operator for Codegened Execution Pipeline of a structured query.

WholeStageCodegenExec itself supports the Java code generation and so when executed triggers code generation for the entire child physical plan subtree of a structured query.

val q = spark.range(10).where('id === 4)
scala> q.queryExecution.debug.codegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*(1) Filter (id#3L = 4)
+- *(1) Range (0, 10, step=1, splits=8)

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
...
Tip

Consider using Debugging Query Execution facility to deep dive into the whole-stage code generation.

val q = spark.range(10).where('id === 4)
import org.apache.spark.sql.execution.debug._
scala> q.debugCodegen()
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*(1) Filter (id#0L = 4)
+- *(1) Range (0, 10, step=1, splits=8)

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
...
Tip

Use the following to enable comments in generated code.

org.apache.spark.SparkEnv.get.conf.set("spark.sql.codegen.comments", "true")
val q = spark.range(10).where('id === 4)
import org.apache.spark.sql.execution.debug._
scala> q.debugCodegen()
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*(1) Filter (id#6L = 4)
+- *(1) Range (0, 10, step=1, splits=8)

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ /**
 * Codegend pipeline for stage (id=1)
 * *(1) Filter (id#6L = 4)
 * +- *(1) Range (0, 10, step=1, splits=8)
 */
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
...

WholeStageCodegenExec is created when:

Note
spark.sql.codegen.wholeStage property is enabled by default.

WholeStageCodegenExec takes a single child physical operator (a physical subquery tree) and codegen stage ID when created.

Note
WholeStageCodegenExec requires that the single child physical operator supports Java code generation.
// RangeExec physical operator does support codegen
import org.apache.spark.sql.execution.RangeExec
import org.apache.spark.sql.catalyst.plans.logical.Range
val rangeExec = RangeExec(Range(start = 0, end = 1, step = 1, numSlices = 1))

import org.apache.spark.sql.execution.WholeStageCodegenExec
val rdd = WholeStageCodegenExec(rangeExec)(codegenStageId = 0).execute()

WholeStageCodegenExec marks the child physical operator with * (star) prefix and per-query codegen stage ID (in round brackets) in the text representation of a physical plan tree.

scala> println(plan.numberedTreeString)
00 *(1) Project [id#117L]
01 +- *(1) BroadcastHashJoin [id#117L], [cast(id#115 as bigint)], Inner, BuildRight
02    :- *(1) Range (0, 1, step=1, splits=8)
03    +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
04       +- Generate explode(ids#112), false, [id#115]
05          +- LocalTableScan [ids#112]
Note
As WholeStageCodegenExec is created as a result of CollapseCodegenStages physical query optimization rule, it is only executed in executedPlan phase of a query execution (that you can only notice by the * star prefix in a plan output).
val q = spark.range(9)

// we need executedPlan with WholeStageCodegenExec physical operator "injected"
val plan = q.queryExecution.executedPlan

// Note the star prefix of Range that marks WholeStageCodegenExec
// As a matter of fact, there are two physical operators in play here
// i.e. WholeStageCodegenExec with Range as the child
scala> println(plan.numberedTreeString)
00 *Range (0, 9, step=1, splits=8)

// Let's unwrap Range physical operator
// and access the parent WholeStageCodegenExec
import org.apache.spark.sql.execution.WholeStageCodegenExec
val wsce = plan.asInstanceOf[WholeStageCodegenExec]

// Trigger code generation of the entire query plan tree
val (ctx, code) = wsce.doCodeGen

// CodeFormatter can pretty-print the code
import org.apache.spark.sql.catalyst.expressions.codegen.CodeFormatter
scala> println(CodeFormatter.format(code))
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ /**
 * Codegend pipeline for
 * Range (0, 9, step=1, splits=8)
 */
/* 006 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
...

When executed, WholeStageCodegenExec gives pipelineTime performance metric.

Table 1. WholeStageCodegenExec’s Performance Metrics
Key Name (in web UI) Description

pipelineTime

(empty)

Time of how long the whole-stage codegend pipeline has been running (i.e. the elapsed time since the underlying BufferedRowIterator had been created and the internal rows were all consumed).

spark sql WholeStageCodegenExec webui.png
Figure 1. WholeStageCodegenExec in web UI (Details for Query)
Tip
Use explain operator to know the physical plan of a query and find out whether or not WholeStageCodegen is in use.
val q = spark.range(10).where('id === 4)
// Note the stars in the output that are for codegened operators
scala> q.explain
== Physical Plan ==
*Filter (id#0L = 4)
+- *Range (0, 10, step=1, splits=8)
Note
Physical plans that support code generation extend CodegenSupport.
Tip

Enable DEBUG logging level for org.apache.spark.sql.execution.WholeStageCodegenExec logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.WholeStageCodegenExec=DEBUG

Refer to Logging.

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

doExecute(): RDD[InternalRow]
Note
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

doExecute generates the Java source code for the child physical plan subtree first and uses CodeGenerator to compile it right afterwards.

If compilation goes well, doExecute branches off per the number of input RDDs.

Note
doExecute only supports up to two input RDDs.
Caution
FIXME Finish the "success" path

If the size of the generated codes is greater than spark.sql.codegen.hugeMethodLimit (which defaults to 65535), doExecute prints out the following INFO message:

Found too long generated codes and JIT optimization might not work: the bytecode size ([maxCodeSize]) is above the limit [spark.sql.codegen.hugeMethodLimit], and the whole-stage codegen was disabled for this plan (id=[codegenStageId]). To avoid this, you can raise the limit `spark.sql.codegen.hugeMethodLimit`:
[treeString]

In the end, doExecute requests the child physical operator to execute (that triggers physical query planning and generates an RDD[InternalRow]) and returns it.

Note
doExecute skips requesting the child physical operator to execute for FileSourceScanExec leaf physical operator with supportsBatch flag enabled (as FileSourceScanExec operator uses WholeStageCodegenExec operator when FileSourceScanExec).

If compilation fails and spark.sql.codegen.fallback configuration property is enabled, doExecute prints out the following WARN message to the logs, requests the child physical operator to execute and returns it.

Whole-stage codegen disabled for plan (id=[codegenStageId]):
 [treeString]

Generating Java Source Code for Child Physical Plan Subtree — doCodeGen Method

doCodeGen(): (CodegenContext, CodeAndComment)

doCodeGen creates a new CodegenContext and requests the single child physical operator to generate a Java source code for produce code path (with the new CodegenContext and the WholeStageCodegenExec physical operator itself).

doCodeGen adds the new function under the name of processNext.

doCodeGen generates the final Java source code of the following format:

public Object generate(Object[] references) {
  return new [className](references);
}

/**
 * Codegend pipeline for stage (id=[codegenStageId])
 * [treeString]
 */
final class [className] extends BufferedRowIterator {

  private Object[] references;
  private scala.collection.Iterator[] inputs;
  // ctx.declareMutableStates()

  public [className](Object[] references) {
    this.references = references;
  }

  public void init(int index, scala.collection.Iterator[] inputs) {
    partitionIndex = index;
    this.inputs = inputs;
    // ctx.initMutableStates()
    // ctx.initPartition()
  }

  // ctx.emitExtraCode()

  // ctx.declareAddedFunctions()
}
Note
doCodeGen requires that the single child physical operator supports Java code generation.

doCodeGen cleans up the generated code (using CodeFormatter to stripExtraNewLines, stripOverlappingComments).

doCodeGen prints out the following DEBUG message to the logs:

DEBUG WholeStageCodegenExec:
[cleanedSource]

In the end, doCodeGen returns the CodegenContext and the Java source code (as a CodeAndComment).

Note

doCodeGen is used when:

Generating Java Source Code for Consume Path in Whole-Stage Code Generation — doConsume Method

doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String
Note
doConsume is part of CodegenSupport Contract to generate the Java source code for consume path in Whole-Stage Code Generation.

doConsume generates a Java source code that:

  1. Takes (from the input row) the code to evaluate a Catalyst expression on an input InternalRow

  2. Takes (from the input row) the term for a value of the result of the evaluation

    1. Adds .copy() to the term if needCopyResult is turned on

  3. Wraps the term inside append() code block

import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
val ctx = new CodegenContext()

import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode
val exprCode = ExprCode(code = "my_code", isNull = "false", value = "my_value")

// wsce defined above, i.e at the top of the page
val consumeCode = wsce.doConsume(ctx, input = Seq(), row = exprCode)
scala> println(consumeCode)
my_code
append(my_value);

Generating Class Name — generatedClassName Method

generatedClassName(): String

generatedClassName gives a class name per spark.sql.codegen.useIdInClassName configuration property:

  • GeneratedIteratorForCodegenStage with the codegen stage ID when enabled (true)

  • GeneratedIterator when disabled (false)

Note
generatedClassName is used exclusively when WholeStageCodegenExec unary physical operator is requested to generate the Java source code for the child physical plan subtree.

isTooManyFields Object Method

isTooManyFields(conf: SQLConf, dataType: DataType): Boolean

isTooManyFields…​FIXME

Note
isTooManyFields is used when…​FIXME

results matching ""

    No results matching ""