scala> :type spark
org.apache.spark.sql.SparkSession
// Example: InsertIntoTable with UnresolvedCatalogRelation
// Drop tables to make the example reproducible
val db = spark.catalog.currentDatabase
Seq("t1", "t2").foreach { t =>
spark.sharedState.externalCatalog.dropTable(db, t, ignoreIfNotExists = true, purge = true)
}
// Create tables
sql("CREATE TABLE t1 (id LONG) USING parquet")
sql("CREATE TABLE t2 (id LONG) USING orc")
import org.apache.spark.sql.catalyst.dsl.plans._
val plan = table("t1").insertInto(tableName = "t2", overwrite = true)
scala> println(plan.numberedTreeString)
00 'InsertIntoTable 'UnresolvedRelation `t2`, true, false
01 +- 'UnresolvedRelation `t1`
// Transform the logical plan with ResolveRelations logical rule first
// so UnresolvedRelations become UnresolvedCatalogRelations
import spark.sessionState.analyzer.ResolveRelations
val planWithUnresolvedCatalogRelations = ResolveRelations(plan)
scala> println(planWithUnresolvedCatalogRelations.numberedTreeString)
00 'InsertIntoTable 'UnresolvedRelation `t2`, true, false
01 +- 'SubqueryAlias t1
02 +- 'UnresolvedCatalogRelation `default`.`t1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
// Let's resolve UnresolvedCatalogRelations then
import org.apache.spark.sql.execution.datasources.FindDataSourceTable
val r = new FindDataSourceTable(spark)
val tablesResolvedPlan = r(planWithUnresolvedCatalogRelations)
// FIXME Why is t2 not resolved?!
scala> println(tablesResolvedPlan.numberedTreeString)
00 'InsertIntoTable 'UnresolvedRelation `t2`, true, false
01 +- SubqueryAlias t1
02 +- Relation[id#10L] parquet
FindDataSourceTable Logical Evaluation Rule — Resolving UnresolvedCatalogRelations
FindDataSourceTable is a Catalyst rule for resolving UnresolvedCatalogRelations (of Spark and Hive tables) in a logical query plan.
FindDataSourceTable is part of additional rules in Resolution fixed-point batch of rules.
FindDataSourceTable takes a single SparkSession to be created.
Applying Rule to Logical Plan (Resolving UnresolvedCatalogRelations) — apply Method
apply(
plan: LogicalPlan): LogicalPlan
|
Note
|
apply is part of Rule contract.
|
apply resolves UnresolvedCatalogRelations for Spark (Data Source) and Hive tables:
-
applycreates HiveTableRelation logical operators for UnresolvedCatalogRelations of Spark tables (incl. InsertIntoTable operators) -
applycreates LogicalRelation logical operators for InsertIntoTable operators with UnresolvedCatalogRelation of a Hive table or UnresolvedCatalogRelations of a Hive table
Creating HiveTableRelation Logical Operator — readHiveTable Internal Method
readHiveTable(
table: CatalogTable): LogicalPlan
readHiveTable creates a HiveTableRelation for the input CatalogTable.
|
Note
|
readHiveTable is used when FindDataSourceTable is requested to resolve an UnresolvedCatalogRelation in a logical plan (for hive tables).
|
Creating LogicalRelation Logical Operator for CatalogTable — readDataSourceTable Internal Method
readDataSourceTable(
table: CatalogTable): LogicalPlan
readDataSourceTable requests the SparkSession for SessionCatalog.
readDataSourceTable requests the SessionCatalog for the cached logical plan for the input CatalogTable.
If not available, readDataSourceTable creates a new DataSource for the provider (of the input CatalogTable) with the extra path option (based on the locationUri of the storage of the input CatalogTable). readDataSourceTable requests the DataSource to resolve the relation and create a corresponding BaseRelation that is then used to create a LogicalRelation with the input CatalogTable.
|
Note
|
readDataSourceTable is used when FindDataSourceTable is requested to resolve an UnresolvedCatalogRelation in a logical plan (for data source tables).
|