//
// The example does NOT work when executed
// "Data not in the BIGINT data type range so converted to null"
// It is enough to show the InsertIntoHiveDirCommand operator though
//
assert(spark.version == "2.4.5")
val tableName = "insert_into_hive_dir_demo"
sql(s"""CREATE TABLE IF NOT EXISTS $tableName (id LONG) USING hive""")
val locationUri = spark.sharedState.externalCatalog.getTable("default", tableName).location.toString
val q = sql(s"""INSERT OVERWRITE DIRECTORY '$locationUri' USING hive SELECT 1L AS id""")
scala> q.explain(true)
== Parsed Logical Plan ==
'InsertIntoDir false, Storage(Location: hdfs://localhost:9000/user/hive/warehouse/insert_into_hive_dir_demo), hive, true
+- Project [1 AS id#49L]
+- OneRowRelation
== Analyzed Logical Plan ==
InsertIntoHiveDirCommand false, Storage(Location: hdfs://localhost:9000/user/hive/warehouse/insert_into_hive_dir_demo), true, [id]
+- Project [1 AS id#49L]
+- OneRowRelation
== Optimized Logical Plan ==
InsertIntoHiveDirCommand false, Storage(Location: hdfs://localhost:9000/user/hive/warehouse/insert_into_hive_dir_demo), true, [id]
+- Project [1 AS id#49L]
+- OneRowRelation
== Physical Plan ==
Execute InsertIntoHiveDirCommand InsertIntoHiveDirCommand false, Storage(Location: hdfs://localhost:9000/user/hive/warehouse/insert_into_hive_dir_demo), true, [id]
+- *(1) Project [1 AS id#49L]
+- Scan OneRowRelation[]
// FIXME Why does the following throw an exception?
// spark.table(tableName)
InsertIntoHiveDirCommand Logical Command
InsertIntoHiveDirCommand
is a logical command that writes the result of executing a structured query to a Hadoop DFS location of a Hive table.
InsertIntoHiveDirCommand
is created when HiveAnalysis logical resolution rule is executed and resolves a InsertIntoDir logical operator with a Hive table.
Creating InsertIntoHiveDirCommand Instance
InsertIntoHiveDirCommand
takes the following to be created:
-
Structured query (as a LogicalPlan)
Executing Logical Command — run
Method
run(
sparkSession: SparkSession,
child: SparkPlan): Seq[Row]
Note
|
run is part of DataWritingCommand contract.
|
run
asserts that the table location of the CatalogStorageFormat is specified (or throws an AssertionError
).
run
checkColumnNameDuplication of the given output columns.
run
creates a CatalogTable for the table location (and the VIEW
table type) and converts it to a Hive Table metadata.
run
specifies serialization.lib
metadata to the serde of the given CatalogStorageFormat or LazySimpleSerDe
if not defined.
run
creates a new map-reduce job for execution (a Hadoop JobConf) with a new Hadoop Configuration (from the input SparkSession).
run
prepares the path to write to (based on the given isLocal flag and creating it if necessary). run
getExternalTmpPath.
run
saveAsHiveFile.
In the end, run
deleteExternalTmpPath.
In case of any error (Throwable
), run
throws an SparkException
:
Failed inserting overwrite directory [locationUri]