InsertIntoDataSourceCommand Logical Command

InsertIntoDataSourceCommand is a RunnableCommand that inserts or overwrites data in an InsertableRelation (per overwrite flag).

InsertIntoDataSourceCommand is created exclusively when DataSourceAnalysis logical resolution is executed (and resolves an InsertIntoTable unary logical operator with a LogicalRelation on an InsertableRelation).

sql("DROP TABLE IF EXISTS t2")
sql("CREATE TABLE t2(id long)")

val query = "SELECT * FROM RANGE(1)"
// Using INSERT INTO SQL statement so we can access QueryExecution
// DataFrameWriter.insertInto returns no value
val q = sql("INSERT INTO TABLE t2 " + query)
val logicalPlan = q.queryExecution.logical
scala> println(logicalPlan.numberedTreeString)
00 'InsertIntoTable 'UnresolvedRelation `t2`, false, false
01 +- 'Project [*]
02    +- 'UnresolvedTableValuedFunction RANGE, [1]

val analyzedPlan = q.queryExecution.analyzed
scala> println(analyzedPlan.numberedTreeString)
00 InsertIntoHiveTable `default`.`t2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, false, false, [id#6L]
01 +- Project [id#6L]
02    +- Range (0, 1, step=1, splits=None)

InsertIntoDataSourceCommand returns the logical query plan when requested for the inner nodes (that should be shown as an inner nested tree of this node).

val query = "SELECT * FROM RANGE(1)"
val sqlText = "INSERT INTO TABLE t2 " + query
val plan = spark.sessionState.sqlParser.parsePlan(sqlText)
scala> println(plan.numberedTreeString)
00 'InsertIntoTable 'UnresolvedRelation `t2`, false, false
01 +- 'Project [*]
02    +- 'UnresolvedTableValuedFunction RANGE, [1]

Creating InsertIntoDataSourceCommand Instance

InsertIntoDataSourceCommand takes the following to be created:

Executing Logical Command (Inserting or Overwriting Data in InsertableRelation) — run Method

run(
  session: SparkSession): Seq[Row]
Note
run is part of RunnableCommand Contract to execute (run) a logical command.

run takes the InsertableRelation (that is the relation of the LogicalRelation).

run then creates a DataFrame for the logical query plan and the input SparkSession.

run requests the DataFrame for the QueryExecution that in turn is requested for the RDD (of the structured query). run requests the LogicalRelation for the output schema.

With the RDD and the output schema, run creates another DataFrame that is the RDD[InternalRow] with the schema applied.

run requests the InsertableRelation to insert or overwrite data.

In the end, since the data in the InsertableRelation has changed, run requests the CacheManager to recacheByPlan with the LogicalRelation.

Note
run requests the SparkSession for SharedState that is in turn requested for the CacheManager.

results matching ""

    No results matching ""