Query Execution

QueryExecution is an integral part of a Dataset and represents the query execution that will eventually "produce" the data in a Dataset (when toRdd is called and a RDD[InternalRow] is computed).

QueryExecution is a transient feature of a Dataset, i.e. it is not preserved across serializations.

You can access the QueryExecution of a Dataset using queryExecution attribute.

val ds: Dataset[Long] = ...
val queryExec = ds.queryExecution

QueryExecution is the result of executing a LogicalPlan in a SparkSession (and so you could create a Dataset from a logical operator or use the QueryExecution after executing a logical operator).

Table 1. QueryExecution Lazy Attributes
Attribute Description


Result of applying the Analyzer's rules to the LogicalPlan (of the QueryExecution).


LogicalPlan that is the analyzed plan after being analyzed, checked (for unsupported operations) and replaced with cached segments.


LogicalPlan (of a structured query) being the result of executing the session-owned Catalyst Query Optimizer to withCachedData.


SparkPlan that is the result of requesting SparkPlanner to plan a optimized logical query plan.

NOTE: In fact, the result SparkPlan is the first Spark query plan from the collection of possible query plans from SparkPlanner.


SparkPlan ready for execution. It is the sparkPlan plan with all the preparation rules applied.


RDD[InternalRow] that is the result of "executing" a physical plan, i.e. executedPlan.execute().

TIP: InternalRow is the internal optimized binary row format.

You can access the lazy attributes as follows:

val dataset: Dataset[Long] = ...

QueryExecution uses the input SparkSession to access the current SparkPlanner (through SessionState that could also return a HiveSessionState) when it is created. It then computes a SparkPlan (a PhysicalPlan exactly) using the planner. It is available as the sparkPlan attribute.

A streaming variant of QueryExecution is IncrementalExecution.

Use explain operator to know about the logical and physical plans of a Dataset.
val ds = spark.range(5)
scala> ds.queryExecution
res17: org.apache.spark.sql.execution.QueryExecution =
== Parsed Logical Plan ==
Range 0, 5, 1, 8, [id#39L]

== Analyzed Logical Plan ==
id: bigint
Range 0, 5, 1, 8, [id#39L]

== Optimized Logical Plan ==
Range 0, 5, 1, 8, [id#39L]

== Physical Plan ==
:  +- Range 0, 1, 8, 5, [id#39L]
FIXME What’s planner? analyzed? Why do we need assertSupported?

QueryExecution belongs to org.apache.spark.sql.execution package.

hiveResultString Method

hiveResultString(): Seq[String]

hiveResultString returns the result as a Hive-compatible sequence of strings.

scala> spark.range(5).queryExecution.hiveResultString
res0: Seq[String] = ArrayBuffer(0, 1, 2, 3, 4)

scala> spark.read.csv("people.csv").queryExecution.hiveResultString
res4: Seq[String] = ArrayBuffer(id	name	age, 0	Jacek	42)

Internally, hiveResultString does..

hiveResultString is executed when…​

Creating QueryExecution Instance

class QueryExecution(
  val sparkSession: SparkSession,
  val logical: LogicalPlan)

QueryExecution requires a SparkSession and a LogicalPlan.

Accessing SparkPlanner — planner Method

planner: SparkPlanner

planner returns the current SparkPlanner.

planner is merely to expose internal planner (in the current SessionState).

preparations — SparkPlan Preparation Rules (to apply before Query Execution)

preparations is a sequence of SparkPlan optimization rules.

A SparkPlan optimization rule transforms a physical operator (aka SparkPlan) to another (possibly more efficient) SparkPlan.

preparations collection is an intermediate phase of query execution that developers can used to introduce further optimizations.

The current list of SparkPlan transformations in preparations is as follows:

  1. ExtractPythonUDFs

  2. PlanSubqueries

  3. EnsureRequirements

  4. CollapseCodegenStages

  5. ReuseExchange

  6. ReuseSubquery

The transformation rules are applied sequentially in order to the physical plan before execution, i.e. they generate a SparkPlan when executedPlan lazy value is accessed.


IncrementalExecution is a custom QueryExecution with OutputMode, checkpointLocation, and currentBatchId.

It lives in org.apache.spark.sql.execution.streaming package.

FIXME What is stateStrategy?

Stateful operators in the query plan are numbered using operatorId that starts with 0.

IncrementalExecution adds one Rule[SparkPlan] called state to preparations sequence of rules as the first element.

FIXME What does IncrementalExecution do? Where is it used?

results matching ""

    No results matching ""