sql("DROP TABLE IF EXISTS t2")
sql("CREATE TABLE t2(id long)")
val query = "SELECT * FROM RANGE(1)"
// Using INSERT INTO SQL statement so we can access QueryExecution
// DataFrameWriter.insertInto returns no value
val q = sql("INSERT INTO TABLE t2 " + query)
val logicalPlan = q.queryExecution.logical
scala> println(logicalPlan.numberedTreeString)
00 'InsertIntoTable 'UnresolvedRelation `t2`, false, false
01 +- 'Project [*]
02 +- 'UnresolvedTableValuedFunction RANGE, [1]
val analyzedPlan = q.queryExecution.analyzed
scala> println(analyzedPlan.numberedTreeString)
00 InsertIntoHiveTable `default`.`t2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, false, false, [id#6L]
01 +- Project [id#6L]
02 +- Range (0, 1, step=1, splits=None)
InsertIntoDataSourceCommand Logical Command
InsertIntoDataSourceCommand
is a RunnableCommand that inserts or overwrites data in an InsertableRelation (per overwrite flag).
InsertIntoDataSourceCommand
is created exclusively when DataSourceAnalysis
logical resolution is executed (and resolves an InsertIntoTable unary logical operator with a LogicalRelation on an InsertableRelation).
InsertIntoDataSourceCommand
returns the logical query plan when requested for the inner nodes (that should be shown as an inner nested tree of this node).
val query = "SELECT * FROM RANGE(1)"
val sqlText = "INSERT INTO TABLE t2 " + query
val plan = spark.sessionState.sqlParser.parsePlan(sqlText)
scala> println(plan.numberedTreeString)
00 'InsertIntoTable 'UnresolvedRelation `t2`, false, false
01 +- 'Project [*]
02 +- 'UnresolvedTableValuedFunction RANGE, [1]
Creating InsertIntoDataSourceCommand Instance
InsertIntoDataSourceCommand
takes the following to be created:
-
LogicalRelation leaf logical operator
Executing Logical Command (Inserting or Overwriting Data in InsertableRelation) — run
Method
run(
session: SparkSession): Seq[Row]
Note
|
run is part of RunnableCommand Contract to execute (run) a logical command.
|
run
takes the InsertableRelation (that is the relation of the LogicalRelation).
run
then creates a DataFrame for the logical query plan and the input SparkSession
.
run
requests the DataFrame
for the QueryExecution that in turn is requested for the RDD (of the structured query). run
requests the LogicalRelation for the output schema.
With the RDD and the output schema, run
creates another DataFrame that is the RDD[InternalRow]
with the schema applied.
run
requests the InsertableRelation
to insert or overwrite data.
In the end, since the data in the InsertableRelation
has changed, run
requests the CacheManager
to recacheByPlan with the LogicalRelation.
Note
|
run requests the SparkSession for SharedState that is in turn requested for the CacheManager.
|