sql("DROP TABLE IF EXISTS t2")
sql("CREATE TABLE t2(id long)")
val query = "SELECT * FROM RANGE(1)"
// Using INSERT INTO SQL statement so we can access QueryExecution
// DataFrameWriter.insertInto returns no value
val q = sql("INSERT INTO TABLE t2 " + query)
val logicalPlan = q.queryExecution.logical
scala> println(logicalPlan.numberedTreeString)
00 'InsertIntoTable 'UnresolvedRelation `t2`, false, false
01 +- 'Project [*]
02 +- 'UnresolvedTableValuedFunction RANGE, [1]
val analyzedPlan = q.queryExecution.analyzed
scala> println(analyzedPlan.numberedTreeString)
00 InsertIntoHiveTable `default`.`t2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, false, false, [id#6L]
01 +- Project [id#6L]
02 +- Range (0, 1, step=1, splits=None)
InsertIntoDataSourceCommand Logical Command
InsertIntoDataSourceCommand is a RunnableCommand that inserts or overwrites data in an InsertableRelation (per overwrite flag).
InsertIntoDataSourceCommand is created exclusively when DataSourceAnalysis logical resolution is executed (and resolves an InsertIntoTable unary logical operator with a LogicalRelation on an InsertableRelation).
InsertIntoDataSourceCommand returns the logical query plan when requested for the inner nodes (that should be shown as an inner nested tree of this node).
val query = "SELECT * FROM RANGE(1)"
val sqlText = "INSERT INTO TABLE t2 " + query
val plan = spark.sessionState.sqlParser.parsePlan(sqlText)
scala> println(plan.numberedTreeString)
00 'InsertIntoTable 'UnresolvedRelation `t2`, false, false
01 +- 'Project [*]
02 +- 'UnresolvedTableValuedFunction RANGE, [1]
Creating InsertIntoDataSourceCommand Instance
InsertIntoDataSourceCommand takes the following to be created:
-
LogicalRelation leaf logical operator
Executing Logical Command (Inserting or Overwriting Data in InsertableRelation) — run Method
run(
session: SparkSession): Seq[Row]
|
Note
|
run is part of RunnableCommand Contract to execute (run) a logical command.
|
run takes the InsertableRelation (that is the relation of the LogicalRelation).
run then creates a DataFrame for the logical query plan and the input SparkSession.
run requests the DataFrame for the QueryExecution that in turn is requested for the RDD (of the structured query). run requests the LogicalRelation for the output schema.
With the RDD and the output schema, run creates another DataFrame that is the RDD[InternalRow] with the schema applied.
run requests the InsertableRelation to insert or overwrite data.
In the end, since the data in the InsertableRelation has changed, run requests the CacheManager to recacheByPlan with the LogicalRelation.
|
Note
|
run requests the SparkSession for SharedState that is in turn requested for the CacheManager.
|