QueryExecution — Structured Query Execution Pipeline

QueryExecution is the execution pipeline (workflow) of a structured query.

QueryExecution is made up of execution stages (phases).

QueryExecution execution pipeline.png
Figure 1. Query Execution — From SQL through Dataset to RDD
Note
When you execute an operator on a Dataset it triggers query execution that gives the good ol' RDD of internal binary rows, i.e. RDD[InternalRow], that is Spark’s execution plan followed by executing an RDD action and so the result of the structured query.

QueryExecution is part of any Dataset using queryExecution attribute.

val ds: Dataset[Long] = ...
val queryExec = ds.queryExecution

QueryExecution is the result of executing a LogicalPlan in a SparkSession (and so you could create a Dataset from a logical operator or use the QueryExecution after executing a logical operator).

val plan: LogicalPlan = ...
val qe = new QueryExecution(sparkSession, plan)
Table 1. QueryExecution’s Properties (aka QueryExecution Phases / Structured Query Execution Pipeline)
Attribute / Phase Description

analyzed

Analyzed logical plan that has passed Analyzer's check rules.

Tip
Beside analyzed, you can use Dataset.explain basic action (with extended flag enabled) or SQL’s EXPLAIN EXTENDED to see the analyzed logical plan of a structured query.

withCachedData

analyzed logical plan after CacheManager was requested to replace logical query segments with cached query plans.

withCachedData makes sure that the logical plan was analyzed and uses supported operations only.

optimizedPlan

Optimized logical plan that is the result of executing the logical query plan optimizer on the withCachedData logical plan.

sparkPlan

Note
sparkPlan is the first physical plan from the collection of all possible physical plans.
Note
It is guaranteed that Catalyst’s QueryPlanner (which SparkPlanner extends) will always generate at least one physical plan.

executedPlan

Optimized physical query plan that is in the final optimized "shape" and therefore ready for execution, i.e. the physical sparkPlan with physical preparation rules applied.

Note
Amongst the physical optimization rules that executedPlan phase triggers is the CollapseCodegenStages physical preparation rule that collapses physical operators that support code generation together as a WholeStageCodegenExec operator.
Note

executedPlan physical plan is used when:

toRdd

toRdd: RDD[InternalRow]

Spark Core’s execution graph of a distributed computation (RDD of internal binary rows) from the executedPlan after execution.

The RDD is the top-level RDD of the DAG of RDDs (that represent physical operators).

Note

toRdd is a "boundary" between two Spark modules: Spark SQL and Spark Core.

After you have executed toRdd (directly or not), you basically "leave" Spark SQL’s Dataset world and "enter" Spark Core’s RDD space.

toRdd triggers a structured query execution (i.e. physical planning, but not execution of the plan) using SparkPlan.execute that recursively triggers execution of every child physical operator in the physical plan tree.

Note
You can use SparkSession.internalCreateDataFrame to apply a schema to an RDD[InternalRow].
Note
Use Dataset.rdd to access the RDD[InternalRow] with internal binary rows deserialized to a Scala type.

You can access the lazy attributes as follows:

val dataset: Dataset[Long] = ...
dataset.queryExecution.executedPlan

QueryExecution uses the Catalyst Query Optimizer and Tungsten for better structured query performance.

Table 2. QueryExecution’s Properties
Name Description

planner

SparkPlanner

QueryExecution uses the input SparkSession to access the current SparkPlanner (through SessionState) when it is created. It then computes a SparkPlan (a PhysicalPlan exactly) using the planner. It is available as the sparkPlan attribute.

Note

A variant of QueryExecution that Spark Structured Streaming uses for query planning is IncrementalExecution.

Refer to IncrementalExecution — QueryExecution of Streaming Datasets in the Spark Structured Streaming gitbook.

Tip
Use explain operator to know about the logical and physical plans of a Dataset.
val ds = spark.range(5)
scala> ds.queryExecution
res17: org.apache.spark.sql.execution.QueryExecution =
== Parsed Logical Plan ==
Range 0, 5, 1, 8, [id#39L]

== Analyzed Logical Plan ==
id: bigint
Range 0, 5, 1, 8, [id#39L]

== Optimized Logical Plan ==
Range 0, 5, 1, 8, [id#39L]

== Physical Plan ==
WholeStageCodegen
:  +- Range 0, 1, 8, 5, [id#39L]
Note
QueryExecution belongs to org.apache.spark.sql.execution package.
Note
QueryExecution is a transient feature of a Dataset, i.e. it is not preserved across serializations.

Text Representation With Statistics — stringWithStats Method

stringWithStats: String

stringWithStats…​FIXME

Note
stringWithStats is used exclusively when ExplainCommand logical command is executed (with cost flag enabled).

debug Object

Caution
FIXME

Building Complete Text Representation — completeString Internal Method

Caution
FIXME

Creating QueryExecution Instance

QueryExecution takes the following when created:

Physical Query Optimizations (Physical Plan Preparation Rules) — preparations Method

preparations: Seq[Rule[SparkPlan]]

preparations is the set of the physical query optimization rules that transform a physical query plan to be more efficient and optimized for execution (i.e. Rule[SparkPlan]).

The preparations physical query optimizations are applied sequentially (one by one) to a physical plan in the following order:

Note

preparations rules are used when:

  • QueryExecution is requested for the executedPlan physical plan (through prepareForExecution)

  • (Spark Structured Streaming) IncrementalExecution is requested for the physical optimization rules for streaming structured queries

Applying preparations Physical Query Optimization Rules to Physical Plan — prepareForExecution Method

prepareForExecution(plan: SparkPlan): SparkPlan

prepareForExecution takes physical preparation rules and applies them one by one to the input physical plan.

Note
prepareForExecution is used exclusively when QueryExecution is requested to prepare the physical plan for execution.

assertSupported Method

assertSupported(): Unit

assertSupported requests UnsupportedOperationChecker to checkForBatch when…​FIXME

Note
assertSupported is used exclusively when QueryExecution is requested for withCachedData logical plan.

Creating Analyzed Logical Plan and Checking Correctness — assertAnalyzed Method

assertAnalyzed(): Unit

assertAnalyzed triggers initialization of analyzed (which is almost like executing it).

Note
assertAnalyzed executes analyzed by accessing it and throwing the result away. Since analyzed is a lazy value in Scala, it will then get initialized for the first time and stays so forever.

assertAnalyzed then requests Analyzer to validate analysis of the logical plan (i.e. analyzed).

Note

assertAnalyzed uses SparkSession to access the current SessionState that it then uses to access the Analyzer.

In Scala the access path looks as follows.

sparkSession.sessionState.analyzer

In case of any AnalysisException, assertAnalyzed creates a new AnalysisException to make sure that it holds analyzed and reports it.

Note

assertAnalyzed is used when:

Building Text Representation with Cost Stats — toStringWithStats Method

toStringWithStats: String

toStringWithStats is a mere alias for completeString with appendStats flag enabled.

Note
toStringWithStats is a custom toString with cost statistics.
// test dataset
val dataset = spark.range(20).limit(2)

// toStringWithStats in action - note Optimized Logical Plan section with Statistics
scala> dataset.queryExecution.toStringWithStats
res6: String =
== Parsed Logical Plan ==
GlobalLimit 2
+- LocalLimit 2
   +- Range (0, 20, step=1, splits=Some(8))

== Analyzed Logical Plan ==
id: bigint
GlobalLimit 2
+- LocalLimit 2
   +- Range (0, 20, step=1, splits=Some(8))

== Optimized Logical Plan ==
GlobalLimit 2, Statistics(sizeInBytes=32.0 B, rowCount=2, isBroadcastable=false)
+- LocalLimit 2, Statistics(sizeInBytes=160.0 B, isBroadcastable=false)
   +- Range (0, 20, step=1, splits=Some(8)), Statistics(sizeInBytes=160.0 B, isBroadcastable=false)

== Physical Plan ==
CollectLimit 2
+- *Range (0, 20, step=1, splits=Some(8))
Note
toStringWithStats is used exclusively when ExplainCommand is executed (only when cost attribute is enabled).

Transforming SparkPlan Execution Result to Hive-Compatible Output Format — hiveResultString Method

hiveResultString(): Seq[String]

hiveResultString returns the result as a Hive-compatible output format.

scala> spark.range(5).queryExecution.hiveResultString
res0: Seq[String] = ArrayBuffer(0, 1, 2, 3, 4)

scala> spark.read.csv("people.csv").queryExecution.hiveResultString
res4: Seq[String] = ArrayBuffer(id	name	age, 0	Jacek	42)

Internally, hiveResultString transformation the SparkPlan.

Table 3. hiveResultString’s SparkPlan Transformations (in execution order)
SparkPlan Description

ExecutedCommandExec for DescribeTableCommand

Executes DescribeTableCommand and transforms every Row to a Hive-compatible output format.

ExecutedCommandExec for ShowTablesCommand

Executes ExecutedCommandExec and transforms the result to a collection of table names.

Any other SparkPlan

Executes SparkPlan and transforms the result to a Hive-compatible output format.

Note
hiveResultString is used exclusively when SparkSQLDriver (of ThriftServer) runs a command.

Extended Text Representation with Logical and Physical Plans — toString Method

toString: String
Note
toString is part of Java’s Object Contract to…​FIXME.

toString is a mere alias for completeString with appendStats flag disabled.

Note
toString is on the "other" side of toStringWithStats which has appendStats flag enabled.

Simple (Basic) Text Representation — simpleString Method

simpleString: String

simpleString requests the optimized SparkPlan for the text representation (of all nodes in the query tree) with verbose flag turned off.

In the end, simpleString adds == Physical Plan == header to the text representation and redacts sensitive information.

import org.apache.spark.sql.{functions => f}
val q = spark.range(10).withColumn("rand", f.rand())
val output = q.queryExecution.simpleString

scala> println(output)
== Physical Plan ==
*(1) Project [id#5L, rand(6017561978775952851) AS rand#7]
+- *(1) Range (0, 10, step=1, splits=8)
Note

simpleString is used when:

  • ExplainCommand is executed

  • Spark Structured Streaming’s StreamingExplainCommand is executed

Redacting Sensitive Information — withRedaction Internal Method

withRedaction(message: String): String

withRedaction takes the value of spark.sql.redaction.string.regex configuration property (as the regular expression to point at sensitive information) and requests Spark Core’s Utils to redact sensitive information in the input message.

Note
Internally, Spark Core’s Utils.redact uses Java’s Regex.replaceAllIn to replace all matches of a pattern with a string.
Note
withRedaction is used when QueryExecution is requested for the simple, extended and with statistics text representations.

results matching ""

    No results matching ""