InsertIntoHiveDirCommand Logical Command

InsertIntoHiveDirCommand is a logical command that writes the result of executing a structured query to a Hadoop DFS location of a Hive table.

InsertIntoHiveDirCommand is created when HiveAnalysis logical resolution rule is executed and resolves a InsertIntoDir logical operator with a Hive table.

//
// The example does NOT work when executed
// "Data not in the BIGINT data type range so converted to null"
// It is enough to show the InsertIntoHiveDirCommand operator though
//
assert(spark.version == "2.4.5")

val tableName = "insert_into_hive_dir_demo"
sql(s"""CREATE TABLE IF NOT EXISTS $tableName (id LONG) USING hive""")

val locationUri = spark.sharedState.externalCatalog.getTable("default", tableName).location.toString
val q = sql(s"""INSERT OVERWRITE DIRECTORY '$locationUri' USING hive SELECT 1L AS id""")
scala> q.explain(true)
== Parsed Logical Plan ==
'InsertIntoDir false, Storage(Location: hdfs://localhost:9000/user/hive/warehouse/insert_into_hive_dir_demo), hive, true
+- Project [1 AS id#49L]
   +- OneRowRelation

== Analyzed Logical Plan ==
InsertIntoHiveDirCommand false, Storage(Location: hdfs://localhost:9000/user/hive/warehouse/insert_into_hive_dir_demo), true, [id]
+- Project [1 AS id#49L]
   +- OneRowRelation

== Optimized Logical Plan ==
InsertIntoHiveDirCommand false, Storage(Location: hdfs://localhost:9000/user/hive/warehouse/insert_into_hive_dir_demo), true, [id]
+- Project [1 AS id#49L]
   +- OneRowRelation

== Physical Plan ==
Execute InsertIntoHiveDirCommand InsertIntoHiveDirCommand false, Storage(Location: hdfs://localhost:9000/user/hive/warehouse/insert_into_hive_dir_demo), true, [id]
+- *(1) Project [1 AS id#49L]
   +- Scan OneRowRelation[]

// FIXME Why does the following throw an exception?
// spark.table(tableName)

Creating InsertIntoHiveDirCommand Instance

InsertIntoHiveDirCommand takes the following to be created:

Executing Logical Command — run Method

run(
  sparkSession: SparkSession,
  child: SparkPlan): Seq[Row]
Note
run is part of DataWritingCommand contract.

run asserts that the table location of the CatalogStorageFormat is specified (or throws an AssertionError).

run creates a CatalogTable for the table location (and the VIEW table type) and converts it to a Hive Table metadata.

run specifies serialization.lib metadata to the serde of the given CatalogStorageFormat or LazySimpleSerDe if not defined.

run creates a new map-reduce job for execution (a Hadoop JobConf) with a new Hadoop Configuration (from the input SparkSession).

run prepares the path to write to (based on the given isLocal flag and creating it if necessary). run getExternalTmpPath.

In the end, run deleteExternalTmpPath.

In case of any error (Throwable), run throws an SparkException:

Failed inserting overwrite directory [locationUri]

results matching ""

    No results matching ""