val q = sql("select (select max(id) from t1) tt from t1")
scala> q.explain
== Physical Plan ==
*Project [Subquery subquery32 AS tt#33L]
: +- Subquery subquery32
: +- *HashAggregate(keys=[], functions=[max(id#20L)])
: +- Exchange SinglePartition
: +- *HashAggregate(keys=[], functions=[partial_max(id#20L)])
: +- *FileScan parquet default.t1[id#20L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/jacek/dev/oss/spark/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
+- *FileScan parquet default.t1[] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/jacek/dev/oss/spark/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>
SubqueryExec Unary Physical Operator
SubqueryExec is a unary physical operator (i.e. with one child physical operator) that…FIXME
SubqueryExec uses relationFuture that is lazily and executed only once when SubqueryExec is first requested to prepare execution that simply triggers execution of the child operator asynchronously (i.e. on a separate thread) and to collect the result soon after (that makes SubqueryExec waiting indefinitely for the child operator to be finished).
|
Caution
|
FIXME When is doPrepare executed?
|
SubqueryExec is created exclusively when PlanSubqueries preparation rule is executed (and transforms ScalarSubquery expressions in a physical plan).
| Key | Name (in web UI) | Description |
|---|---|---|
time to collect (ms) |
||
data size (bytes) |
|
Note
|
SubqueryExec physical operator is almost an exact copy of BroadcastExchangeExec physical operator.
|
Executing Child Operator Asynchronously — doPrepare Method
doPrepare(): Unit
|
Note
|
doPrepare is part of SparkPlan Contract to prepare a physical operator for execution.
|
doPrepare simply triggers initialization of the internal lazily-once-initialized relationFuture asynchronous computation.
relationFuture Internal Lazily-Once-Initialized Property
relationFuture: Future[Array[InternalRow]]
When "materialized" (aka executed), relationFuture spawns a new thread of execution that requests SQLExecution to execute an action (with the current execution id) on subquery daemon cached thread pool.
|
Note
|
relationFuture uses Scala’s scala.concurrent.Future that spawns a new thread of execution once instantiated.
|
The action tracks execution of the child physical operator to executeCollect and collects collectTime and dataSize SQL metrics.
In the end, relationFuture posts metric updates and returns the internal rows.
|
Note
|
relationFuture is executed on a separate thread from a custom scala.concurrent.ExecutionContext (built from a cached java.util.concurrent.ThreadPoolExecutor with the prefix subquery and up to 16 threads).
|
|
Note
|
relationFuture is used when SubqueryExec is requested to prepare for execution (that triggers execution of the child operator) and execute collect (that waits indefinitely until the child operator has finished).
|
Collecting Internal Rows of Executing SubqueryExec Operator — executeCollect Method
executeCollect(): Array[InternalRow]
|
Note
|
executeCollect is part of SparkPlan Contract to execute a physical operator and collect the results as collection of internal rows.
|
executeCollect waits till relationFuture gives a result (as a Array[InternalRow]).