val q = sql("select (select max(id) from t1) tt from t1")
scala> q.explain
== Physical Plan ==
*Project [Subquery subquery32 AS tt#33L]
: +- Subquery subquery32
: +- *HashAggregate(keys=[], functions=[max(id#20L)])
: +- Exchange SinglePartition
: +- *HashAggregate(keys=[], functions=[partial_max(id#20L)])
: +- *FileScan parquet default.t1[id#20L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/jacek/dev/oss/spark/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
+- *FileScan parquet default.t1[] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/jacek/dev/oss/spark/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>
SubqueryExec Unary Physical Operator
SubqueryExec
is a unary physical operator (i.e. with one child physical operator) that…FIXME
SubqueryExec
uses relationFuture that is lazily and executed only once when SubqueryExec
is first requested to prepare execution that simply triggers execution of the child operator asynchronously (i.e. on a separate thread) and to collect the result soon after (that makes SubqueryExec
waiting indefinitely for the child operator to be finished).
Caution
|
FIXME When is doPrepare executed?
|
SubqueryExec
is created exclusively when PlanSubqueries
preparation rule is executed (and transforms ScalarSubquery
expressions in a physical plan).
Key | Name (in web UI) | Description |
---|---|---|
time to collect (ms) |
||
data size (bytes) |
Note
|
SubqueryExec physical operator is almost an exact copy of BroadcastExchangeExec physical operator.
|
Executing Child Operator Asynchronously — doPrepare
Method
doPrepare(): Unit
Note
|
doPrepare is part of SparkPlan Contract to prepare a physical operator for execution.
|
doPrepare
simply triggers initialization of the internal lazily-once-initialized relationFuture asynchronous computation.
relationFuture
Internal Lazily-Once-Initialized Property
relationFuture: Future[Array[InternalRow]]
When "materialized" (aka executed), relationFuture
spawns a new thread of execution that requests SQLExecution
to execute an action (with the current execution id) on subquery daemon cached thread pool.
Note
|
relationFuture uses Scala’s scala.concurrent.Future that spawns a new thread of execution once instantiated.
|
The action tracks execution of the child physical operator to executeCollect and collects collectTime and dataSize SQL metrics.
In the end, relationFuture
posts metric updates and returns the internal rows.
Note
|
relationFuture is executed on a separate thread from a custom scala.concurrent.ExecutionContext (built from a cached java.util.concurrent.ThreadPoolExecutor with the prefix subquery and up to 16 threads).
|
Note
|
relationFuture is used when SubqueryExec is requested to prepare for execution (that triggers execution of the child operator) and execute collect (that waits indefinitely until the child operator has finished).
|
Collecting Internal Rows of Executing SubqueryExec Operator — executeCollect
Method
executeCollect(): Array[InternalRow]
Note
|
executeCollect is part of SparkPlan Contract to execute a physical operator and collect the results as collection of internal rows.
|
executeCollect
waits till relationFuture gives a result (as a Array[InternalRow]
).