// make sure that the tables are available in a catalog
sql("CREATE TABLE IF NOT EXISTS t1(id long)")
sql("CREATE TABLE IF NOT EXISTS t2(id long)")
val q = sql("INSERT INTO TABLE t2 SELECT * from t1 LIMIT 100")
val plan = q.queryExecution.logical
scala> println(plan.numberedTreeString)
00 'InsertIntoTable 'UnresolvedRelation `t2`, false, false
01 +- 'GlobalLimit 100
02 +- 'LocalLimit 100
03 +- 'Project [*]
04 +- 'UnresolvedRelation `t1`
// Dataset API's version of "INSERT OVERWRITE TABLE" in SQL
spark.range(10).write.mode("overwrite").insertInto("t2")
InsertIntoTable Unary Logical Operator
InsertIntoTable
is an unary logical operator that represents the following high-level operators in a logical plan:
-
INSERT INTO and INSERT OVERWRITE TABLE SQL statements
-
DataFrameWriter.insertInto high-level operator
spark.range(10)
.withColumn("p1", 'id % 2)
.write
.mode("overwrite")
.partitionBy("p1")
.saveAsTable("partitioned_table")
val insertIntoQ = sql("INSERT INTO TABLE partitioned_table PARTITION (p1 = 4) VALUES 41, 42")
scala> println(insertIntoQ.queryExecution.logical.numberedTreeString)
00 'InsertIntoTable 'UnresolvedRelation `partitioned_table`, Map(p1 -> Some(4)), false, false
01 +- 'UnresolvedInlineTable [col1], [List(41), List(42)]
spark.range(10)
.withColumn("p1", 'id % 2)
.write
.mode("overwrite")
.partitionBy("p1")
.saveAsTable("partitioned_table")
val insertOverwriteQ = sql("INSERT OVERWRITE TABLE partitioned_table PARTITION (p1 = 4) VALUES 40")
scala> println(insertOverwriteQ.queryExecution.logical.numberedTreeString)
00 'InsertIntoTable 'UnresolvedRelation `partitioned_table`, Map(p1 -> Some(4)), true, false
01 +- 'UnresolvedInlineTable [col1], [List(40)]
InsertIntoTable
is created with partition keys that correspond to the partitionSpec
part of the following SQL statements:
-
INSERT INTO TABLE
(with the overwrite and ifPartitionNotExists flags off) -
INSERT OVERWRITE TABLE
(with the overwrite and ifPartitionNotExists flags off)
InsertIntoTable
has no partition keys when created as follows:
-
insertInto operator from the Catalyst DSL
-
DataFrameWriter.insertInto operator
InsertIntoTable
can never be resolved (i.e. InsertIntoTable
should not be part of a logical plan after analysis and is supposed to be converted to logical commands at analysis phase).
Logical Command | Description |
---|---|
When HiveAnalysis resolution rule transforms |
|
When DataSourceAnalysis posthoc logical resolution resolves an |
|
When DataSourceAnalysis posthoc logical resolution transforms |
Caution
|
FIXME What’s the difference between HiveAnalysis that converts InsertIntoTable(r: HiveTableRelation…) to InsertIntoHiveTable and RelationConversions that converts InsertIntoTable(r: HiveTableRelation,…) to InsertIntoTable (with LogicalRelation )?
|
Note
|
Inserting into views or RDD-based tables is not allowed (and fails at analysis). |
InsertIntoTable
(with UnresolvedRelation leaf logical operator) is created when:
-
INSERT INTO
orINSERT OVERWRITE TABLE
SQL statements are executed (as a single insert or a multi-insert query) -
DataFrameWriter
is requested to insert a DataFrame into a table -
RelationConversions
logical evaluation rule is executed (and transformsInsertIntoTable
operators) -
CreateHiveTableAsSelectCommand logical command is executed
InsertIntoTable
has an empty output schema.
Catalyst DSL — insertInto
Operator
insertInto(
tableName: String,
overwrite: Boolean = false): LogicalPlan
insertInto operator in Catalyst DSL creates an InsertIntoTable
logical operator, e.g. for testing or Spark SQL internals exploration.
import org.apache.spark.sql.catalyst.dsl.plans._
val plan = table("a").insertInto(tableName = "t1", overwrite = true)
scala> println(plan.numberedTreeString)
00 'InsertIntoTable 'UnresolvedRelation `t1`, true, false
01 +- 'UnresolvedRelation `a`
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
val op = plan.p(0)
assert(op.isInstanceOf[InsertIntoTable])
Creating InsertIntoTable Instance
InsertIntoTable
takes the following when created:
-
Logical plan for the table to insert into
-
Partition keys (with optional partition values for dynamic partition insert)
-
Logical plan representing the data to be written
-
overwrite
flag that indicates whether to overwrite an existing table or partitions (true
) or not (false
)
Inserting Into View Not Allowed
Inserting into a view is not allowed, i.e. a query plan with an InsertIntoTable
operator with a UnresolvedRelation leaf operator that is resolved to a View unary operator fails at analysis (when ResolveRelations logical resolution is executed).
Inserting into a view is not allowed. View: [name].
// Create a view
val viewName = "demo_view"
sql(s"DROP VIEW IF EXISTS $viewName")
assert(spark.catalog.tableExists(viewName) == false)
sql(s"CREATE VIEW $viewName COMMENT 'demo view' AS SELECT 1,2,3")
assert(spark.catalog.tableExists(viewName))
// The following should fail with an AnalysisException
scala> spark.range(0).write.insertInto(viewName)
org.apache.spark.sql.AnalysisException: Inserting into a view is not allowed. View: `default`.`demo_view`.;
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:644)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:640)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:640)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:586)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:124)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:118)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:103)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:61)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:322)
at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:308)
... 49 elided
Inserting Into RDD-Based Table Not Allowed
Inserting into an RDD-based table is not allowed, i.e. a query plan with an InsertIntoTable
operator with one of the following logical operators (as the logical plan representing the table) fails at analysis (when PreWriteCheck extended logical check is executed):
-
Logical operator is not a leaf node
-
Range leaf operator
-
OneRowRelation leaf operator
-
LocalRelation leaf operator
// Create a temporary view
val data = spark.range(1)
data.createOrReplaceTempView("demo")
scala> spark.range(0).write.insertInto("demo")
org.apache.spark.sql.AnalysisException: Inserting into an RDD-based table is not allowed.;;
'InsertIntoTable Range (0, 1, step=1, splits=Some(8)), false, false
+- Range (0, 0, step=1, splits=Some(8))
at org.apache.spark.sql.execution.datasources.PreWriteCheck$.failAnalysis(rules.scala:442)
at org.apache.spark.sql.execution.datasources.PreWriteCheck$$anonfun$apply$14.apply(rules.scala:473)
at org.apache.spark.sql.execution.datasources.PreWriteCheck$$anonfun$apply$14.apply(rules.scala:445)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:117)
at org.apache.spark.sql.execution.datasources.PreWriteCheck$.apply(rules.scala:445)
at org.apache.spark.sql.execution.datasources.PreWriteCheck$.apply(rules.scala:440)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$2.apply(CheckAnalysis.scala:349)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$2.apply(CheckAnalysis.scala:349)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:349)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:61)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:322)
at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:308)
... 49 elided