SparkPlan Contract — Physical Operators in Physical Query Plan of Structured Query

SparkPlan is the contract of physical operators to build a physical query plan (aka query execution plan).

SparkPlan contract requires that a concrete physical operator implements doExecute method.

doExecute(): RDD[InternalRow]

doExecute allows a physical operator to describe a distributed computation (that is a runtime representation of the operator in particular and a structured query in general) as an RDD of internal binary rows, i.e. RDD[InternalRow], and thus execute.

Table 1. SparkPlan’s Extension Hooks
Name Description


By default reports a UnsupportedOperationException.

[nodeName] does not implement doExecuteBroadcast

Executed exclusively as part of executeBroadcast to return the result of a structured query as a broadcast variable.


Prepares a physical operator for execution

Executed exclusively as part of prepare and is supposed to set some state up before executing a query (e.g. BroadcastExchangeExec to broadcast a relation asynchronously or SubqueryExec to execute a child operator)


requiredChildDistribution: Seq[Distribution]

The required partition requirements (aka child output distributions) of the input data, i.e. how children physical operators' output is split across partitions.

Defaults to a UnspecifiedDistribution for all of the child operators.

Used exclusively when EnsureRequirements physical query plan optimization is executed (and enforces partition requirements).


requiredChildOrdering: Seq[Seq[SortOrder]]

Specifies required sort ordering for each partition requirement (from children operators)

Defaults to no sort ordering for all of the physical operator’s children.

Used exclusively when EnsureRequirements physical query plan optimization is executed (and enforces partition requirements).

SparkPlan is a recursive data structure in Spark SQL’s Catalyst tree manipulation framework and as such represents a single physical operator in a physical execution query plan as well as a physical execution query plan itself (i.e. a tree of physical operators in a query plan of a structured query).

spark sql SparkPlan webui physical plan.png
Figure 1. Physical Plan of Structured Query (i.e. Tree of SparkPlans)
A structured query can be expressed using Spark SQL’s high-level Dataset API for Scala, Java, Python, R or good ol' SQL.

A SparkPlan physical operator is a Catalyst tree node that may have zero or more child physical operators.

A structured query is basically a single SparkPlan physical operator with child physical operators.
Spark SQL uses Catalyst tree manipulation framework to compose nodes to build a tree of (logical or physical) operators that, in this particular case, is composing SparkPlan physical operator nodes to build the physical execution plan tree of a structured query.

The entry point to Physical Operator Execution Pipeline is execute.

When executed, SparkPlan executes the internal query implementation in a named scope (for visualization purposes, e.g. web UI) that triggers prepare of the children physical operators first followed by prepareSubqueries and finally doPrepare methods. After subqueries have finished, doExecute method is eventually triggered.

spark sql SparkPlan execute.png
Figure 2. SparkPlan’s Execution (execute Method)

The result of executing a SparkPlan is an RDD of internal binary rows, i.e. RDD[InternalRow].

Executing a structured query is simply a translation of the higher-level Dataset-based description to an RDD-based runtime representation that Spark will in the end execute (once an Dataset action is used).
FIXME Picture between Spark SQL’s Dataset ⇒ Spark Core’s RDD

SparkPlan has access to the owning SparkContext (from the Spark Core).


execute is called when QueryExecution is requested for the RDD that is Spark Core’s physical execution plan (as a RDD lineage) that triggers query execution (i.e. physical planning, but not execution of the plan) and could be considered execution of a structured query.

The could part above refers to the fact that the final execution of a structured query happens only when a RDD action is executed on the RDD of a structured query. And hence the need for Spark SQL’s high-level Dataset API in which the Dataset operators simply execute a RDD action on the corresponding RDD. Easy, isn’t it?


Use explain operator to see the execution plan of a structured query.

val q = // your query here

You may also access the execution plan of a Dataset using its queryExecution property.

val q = // your query here

The SparkPlan contract assumes that concrete physical operators define doExecute method (with optional hooks like doPrepare) which is executed when the physical operator is executed.

spark sql SparkPlan execute pipeline.png
Figure 3. SparkPlan.execute — Physical Operator Execution Pipeline

SparkPlan has the following final methods that prepare execution environment and pass calls to corresponding methods (that constitute SparkPlan Contract).

Table 2. SparkPlan’s Final Methods
Name Description


execute(): RDD[InternalRow]

"Executes" a physical operator (and its children) that triggers physical query planning and in the end generates an RDD of internal binary rows (i.e. RDD[InternalRow]).

Used mostly when QueryExecution is requested for the RDD-based runtime representation of a structured query (that describes a distributed computation using Spark Core’s RDD).

Internally, execute first prepares the physical operator for execution and eventually requests it to doExecute.

Executing doExecute in a named scope happens only after the operator is prepared for execution followed by waiting for any subqueries to finish.


executeQuery[T](query: => T): T

Executes a physical operator in a single RDD scope, i.e. all RDDs created during execution of the physical operator have the same scope.

executeQuery executes the input query after the following methods (in order):


executeQuery is used when:


prepare(): Unit

Prepares a physical operator for execution

prepare is used mainly when a physical operator is requested to execute a structured query

prepare is also used recursively for every child physical operator (down the physical plan) and when a physical operator is requested to prepare subqueries.

prepare is idempotent, i.e. can be called multiple times with no change to the final result. It uses prepared internal flag to execute the physical operator once only.

Internally, prepare calls doPrepare of its children before prepareSubqueries and doPrepare.


executeBroadcast[T](): broadcast.Broadcast[T]
Table 3. Physical Query Operators / Specialized SparkPlans
Name Description


Binary physical operator with two child left and right physical operators


Leaf physical operator with no children

By default, the set of all attributes that are produced is exactly the set of attributes that are output.


Unary physical operator with one child physical operator

The naming convention for physical operators in Spark’s source code is to have their names end with the Exec prefix, e.g. DebugExec or LocalTableScanExec that is however removed when the operator is displayed, e.g. in web UI.
Table 4. SparkPlan’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description


Flag that controls that prepare is executed only once.


Flag that controls whether the subexpression elimination optimization is enabled or not.

Used when the following physical operators are requested to execute (i.e. describe a distributed computation as an RDD of internal rows):

FIXME SparkPlan is Serializable. Why? Is this because Dataset.cache persists executed query plans?

Decoding Byte Arrays Back to UnsafeRows — decodeUnsafeRows Method


Compressing Partitions of UnsafeRows (to Byte Arrays) After Executing Physical Operator — getByteArrayRdd Internal Method

getByteArrayRdd(n: Int = -1): RDD[Array[Byte]]

resetMetrics Method

resetMetrics(): Unit

resetMetrics takes metrics and request them to reset.

resetMetrics is used when…​FIXME

prepareSubqueries Method


executeToIterator Method


executeCollectIterator Method

executeCollectIterator(): (Long, Iterator[InternalRow])


executeCollectIterator is used when…​FIXME

Preparing SparkPlan for Query Execution — executeQuery Final Method

executeQuery[T](query: => T): T

executeQuery executes the input query in a named scope (i.e. so that all RDDs created will have the same scope for visualization like web UI).

Internally, executeQuery calls prepare and waitForSubqueries followed by executing query.

executeQuery is executed as part of execute, executeBroadcast and when CodegenSupport-enabled physical operator produces a Java source code.

Broadcasting Result of Structured Query — executeBroadcast Final Method

executeBroadcast[T](): broadcast.Broadcast[T]

executeBroadcast returns the result of a structured query as a broadcast variable.

Internally, executeBroadcast calls doExecuteBroadcast inside executeQuery.

executeBroadcast is called in BroadcastHashJoinExec, BroadcastNestedLoopJoinExec and ReusedExchangeExec physical operators.

Performance Metrics — metrics Method

metrics: Map[String, SQLMetric] = Map.empty

metrics returns the SQLMetrics by their names.

By default, metrics contains no SQLMetrics (i.e. Map.empty).

metrics is used when…​FIXME

Taking First N UnsafeRows — executeTake Method

executeTake(n: Int): Array[InternalRow]

executeTake gives an array of up to n first internal rows.

spark sql SparkPlan executeTake.png
Figure 4. SparkPlan’s executeTake takes 5 elements

Internally, executeTake gets an RDD of byte array of n unsafe rows and scans the RDD partitions one by one until n is reached or all partitions were processed.

executeTake runs Spark jobs that take all the elements from requested number of partitions, starting from the 0th partition and increasing their number by spark.sql.limit.scaleUpFactor property (but minimum twice as many).

executeTake uses SparkContext.runJob to run a Spark job.

In the end, executeTake decodes the unsafe rows.

executeTake gives an empty collection when n is 0 (and no Spark job is executed).
executeTake may take and decode more unsafe rows than really needed since all unsafe rows from a partition are read (if the partition is included in the scan).
import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS
spark.sessionState.conf.setConf(SHUFFLE_PARTITIONS, 10)

// 8 groups over 10 partitions
// only 7 partitions are with numbers
val nums = spark.
  range(start = 0, end = 20, step = 1, numPartitions = 4).
  repartition($"id" % 8)

import scala.collection.Iterator
val showElements = (it: Iterator[java.lang.Long]) => {
  val ns = it.toSeq
  import org.apache.spark.TaskContext
  val pid = TaskContext.get.partitionId
  println(s"[partition: $pid][size: ${ns.size}] ${ns.mkString(" ")}")
// ordered by partition id manually for demo purposes
scala> nums.foreachPartition(showElements)
[partition: 0][size: 2] 4 12
[partition: 1][size: 2] 7 15
[partition: 2][size: 0]
[partition: 3][size: 0]
[partition: 4][size: 0]
[partition: 5][size: 5] 0 6 8 14 16
[partition: 6][size: 0]
[partition: 7][size: 3] 3 11 19
[partition: 8][size: 5] 2 5 10 13 18
[partition: 9][size: 3] 1 9 17

scala> println(spark.sessionState.conf.limitScaleUpFactor)

// Think how many Spark jobs will the following queries run?
// Answers follow
scala> nums.take(13)
res0: Array[Long] = Array(4, 12, 7, 15, 0, 6, 8, 14, 16, 3, 11, 19, 2)

// The number of Spark jobs = 3

scala> nums.take(5)
res34: Array[Long] = Array(4, 12, 7, 15, 0)

// The number of Spark jobs = 4

scala> nums.take(3)
res38: Array[Long] = Array(4, 12, 7)

// The number of Spark jobs = 2

executeTake is used when:

Executing Physical Operator and Collecting Results — executeCollect Method

executeCollect(): Array[InternalRow]

executeCollect executes the physical operator and compresses partitions of UnsafeRows as byte arrays (that yields a RDD[(Long, Array[Byte])] and so no real Spark jobs may have been submitted).

executeCollect runs a Spark job to collect the elements of the RDD and for every pair in the result (of a count and bytes per partition) decodes the byte arrays back to UnsafeRows and stores the decoded arrays together as the final Array[InternalRow].

executeCollect runs a Spark job using Spark Core’s RDD.collect operator.
executeCollect returns Array[InternalRow], i.e. keeps the internal representation of rows unchanged and does not convert rows to JVM types.

executeCollect is used when:

executeCollectPublic Method

executeCollectPublic(): Array[Row]


executeCollectPublic is used when…​FIXME

newPredicate Method

newPredicate(expression: Expression, inputSchema: Seq[Attribute]): GenPredicate


newPredicate is used when…​FIXME

Waiting for Subqueries to Finish — waitForSubqueries Method

waitForSubqueries(): Unit

waitForSubqueries requests every ExecSubqueryExpression in runningSubqueries to updateResult.

waitForSubqueries is used exclusively when a physical operator is requested to prepare itself for query execution (when it is executed or requested to executeBroadcast).

Output Data Partitioning Requirements — outputPartitioning Method

outputPartitioning: Partitioning

outputPartitioning specifies the output data partitioning requirements, i.e. a hint for the Spark Physical Optimizer for the number of partitions the output of the physical operator should be split across.

outputPartitioning defaults to a UnknownPartitioning (with 0 partitions).


outputPartitioning is used when:

Output Data Ordering Requirements — outputOrdering Method

outputOrdering: Seq[SortOrder]

outputOrdering specifies the output data ordering requirements of the physical operator, i.e. a hint for the Spark Physical Optimizer for the sorting (ordering) of the data (within and across partitions).

outputOrdering defaults to no ordering (Nil).


outputOrdering is used when:

results matching ""

    No results matching ""