SubqueryExec Unary Physical Operator

SubqueryExec is a unary physical operator (i.e. with one child physical operator) that…​FIXME

SubqueryExec uses relationFuture that is lazily and executed only once when SubqueryExec is first requested to prepare execution that simply triggers execution of the child operator asynchronously (i.e. on a separate thread) and to collect the result soon after (that makes SubqueryExec waiting indefinitely for the child operator to be finished).

Caution
FIXME When is doPrepare executed?

SubqueryExec is created exclusively when PlanSubqueries preparation rule is executed (and transforms ScalarSubquery expressions in a physical plan).

val q = sql("select (select max(id) from t1) tt from t1")
scala> q.explain
== Physical Plan ==
*Project [Subquery subquery32 AS tt#33L]
:  +- Subquery subquery32
:     +- *HashAggregate(keys=[], functions=[max(id#20L)])
:        +- Exchange SinglePartition
:           +- *HashAggregate(keys=[], functions=[partial_max(id#20L)])
:              +- *FileScan parquet default.t1[id#20L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/jacek/dev/oss/spark/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
+- *FileScan parquet default.t1[] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/jacek/dev/oss/spark/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>
Table 1. SubqueryExec’s Performance Metrics
Key Name (in web UI) Description

collectTime

time to collect (ms)

dataSize

data size (bytes)

spark sql SubqueryExec webui details for query.png
Figure 1. SubqueryExec in web UI (Details for Query)
Note
SubqueryExec physical operator is almost an exact copy of BroadcastExchangeExec physical operator.

Executing Child Operator Asynchronously — doPrepare Method

doPrepare(): Unit
Note
doPrepare is part of SparkPlan Contract to prepare a physical operator for execution.

doPrepare simply triggers initialization of the internal lazily-once-initialized relationFuture asynchronous computation.

relationFuture Internal Lazily-Once-Initialized Property

relationFuture: Future[Array[InternalRow]]

When "materialized" (aka executed), relationFuture spawns a new thread of execution that requests SQLExecution to execute an action (with the current execution id) on subquery daemon cached thread pool.

Note
relationFuture uses Scala’s scala.concurrent.Future that spawns a new thread of execution once instantiated.

The action tracks execution of the child physical operator to executeCollect and collects collectTime and dataSize SQL metrics.

In the end, relationFuture posts metric updates and returns the internal rows.

Note
relationFuture is executed on a separate thread from a custom scala.concurrent.ExecutionContext (built from a cached java.util.concurrent.ThreadPoolExecutor with the prefix subquery and up to 16 threads).
Note
relationFuture is used when SubqueryExec is requested to prepare for execution (that triggers execution of the child operator) and execute collect (that waits indefinitely until the child operator has finished).

Creating SubqueryExec Instance

SubqueryExec takes the following when created:

Collecting Internal Rows of Executing SubqueryExec Operator — executeCollect Method

executeCollect(): Array[InternalRow]
Note
executeCollect is part of SparkPlan Contract to execute a physical operator and collect the results as collection of internal rows.

executeCollect waits till relationFuture gives a result (as a Array[InternalRow]).

results matching ""

    No results matching ""