val data = Seq(1, 3, 4, 7)
val nums = data.toDF
scala> :type nums
org.apache.spark.sql.DataFrame
val plan = nums.queryExecution.analyzed
scala> println(plan.numberedTreeString)
00 LocalRelation [value#1]
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
val relation = plan.collect { case r: LocalRelation => r }.head
assert(relation.isInstanceOf[LocalRelation])
val sql = relation.toSQL(inlineTableName = "demo")
assert(sql == "VALUES (1), (3), (4), (7) AS demo(value)")
val stats = relation.computeStats
scala> println(stats)
Statistics(sizeInBytes=48.0 B, hints=none)
LocalRelation Leaf Logical Operator
LocalRelation is a leaf logical operator that represents a scan over local collections (and allow for optimizations so functions like collect or take can be executed locally on the driver and with no executors).
LocalRelation is created (using apply, fromExternalRows, and fromProduct factory methods) when:
-
ResolveInlineTables logical resolution rule is executed (and converts an UnresolvedInlineTable)
-
PruneFilters, ConvertToLocalRelation, and PropagateEmptyRelation, OptimizeMetadataOnlyQuery logical optimization rules are executed (applied to an analyzed logical plan)
-
SparkSession.createDataset, SparkSession.emptyDataset, SparkSession.createDataFrame operators are used
-
CatalogImplis requested for a Dataset from DefinedByConstructorParams data -
Datasetis requested for the analyzed logical plan (and executes Command logical operators) -
StatFunctionsis requested to crossTabulate and generate summary statistics of Dataset (as DataFrame)
|
Note
|
Dataset is local when the analyzed logical plan is exactly an instance of LocalRelation.
|
LocalRelation is resolved to LocalTableScanExec leaf physical operator when BasicOperators execution planning strategy is executed (i.e. plan a logical plan to a physical plan).
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
assert(relation.isInstanceOf[LocalRelation])
scala> :type spark
org.apache.spark.sql.SparkSession
import spark.sessionState.planner.BasicOperators
val localScan = BasicOperators(relation).head
import org.apache.spark.sql.execution.LocalTableScanExec
assert(localScan.isInstanceOf[LocalTableScanExec])
When requested for statistics, LocalRelation takes the size of the objects in a single row (per the output schema) and multiplies it by the number of rows (in the data).
Creating LocalRelation Instance
LocalRelation takes the following to be created:
-
Output schema attributes
-
Collection of internal binary rows
-
isStreamingflag that indicates whether the data comes from a streaming source (default:false)
While being created, LocalRelation makes sure that the output attributes are all resolved or throws an IllegalArgumentException:
Unresolved attributes found when constructing LocalRelation.
Creating LocalRelation — apply Object Method
apply(output: Attribute*): LocalRelation
apply(
output1: StructField,
output: StructField*): LocalRelation
apply…FIXME
|
Note
|
apply is used when…FIXME
|
Creating LocalRelation — fromExternalRows Object Method
fromExternalRows(
output: Seq[Attribute],
data: Seq[Row]): LocalRelation
fromExternalRows…FIXME
|
Note
|
fromExternalRows is used when…FIXME
|
Creating LocalRelation — fromProduct Object Method
fromProduct(
output: Seq[Attribute],
data: Seq[Product]): LocalRelation
fromProduct…FIXME
|
Note
|
fromProduct is used when…FIXME
|
Generating SQL Statement — toSQL Method
toSQL(inlineTableName: String): String
toSQL generates a SQL statement of the format:
VALUES [data] AS [inlineTableName]([names])
toSQL throws an AssertionError for the data empty.
|
Note
|
toSQL does not seem to be used at all.
|