scala> :type spark
org.apache.spark.sql.SparkSession
// Example: InsertIntoTable with UnresolvedCatalogRelation
// Drop tables to make the example reproducible
val db = spark.catalog.currentDatabase
Seq("t1", "t2").foreach { t =>
spark.sharedState.externalCatalog.dropTable(db, t, ignoreIfNotExists = true, purge = true)
}
// Create tables
sql("CREATE TABLE t1 (id LONG) USING parquet")
sql("CREATE TABLE t2 (id LONG) USING orc")
import org.apache.spark.sql.catalyst.dsl.plans._
val plan = table("t1").insertInto(tableName = "t2", overwrite = true)
scala> println(plan.numberedTreeString)
00 'InsertIntoTable 'UnresolvedRelation `t2`, true, false
01 +- 'UnresolvedRelation `t1`
// Transform the logical plan with ResolveRelations logical rule first
// so UnresolvedRelations become UnresolvedCatalogRelations
import spark.sessionState.analyzer.ResolveRelations
val planWithUnresolvedCatalogRelations = ResolveRelations(plan)
scala> println(planWithUnresolvedCatalogRelations.numberedTreeString)
00 'InsertIntoTable 'UnresolvedRelation `t2`, true, false
01 +- 'SubqueryAlias t1
02 +- 'UnresolvedCatalogRelation `default`.`t1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
// Let's resolve UnresolvedCatalogRelations then
import org.apache.spark.sql.execution.datasources.FindDataSourceTable
val r = new FindDataSourceTable(spark)
val tablesResolvedPlan = r(planWithUnresolvedCatalogRelations)
// FIXME Why is t2 not resolved?!
scala> println(tablesResolvedPlan.numberedTreeString)
00 'InsertIntoTable 'UnresolvedRelation `t2`, true, false
01 +- SubqueryAlias t1
02 +- Relation[id#10L] parquet
FindDataSourceTable Logical Evaluation Rule — Resolving UnresolvedCatalogRelations
FindDataSourceTable
is a Catalyst rule for resolving UnresolvedCatalogRelations (of Spark and Hive tables) in a logical query plan.
FindDataSourceTable
is part of additional rules in Resolution
fixed-point batch of rules.
FindDataSourceTable
takes a single SparkSession to be created.
Applying Rule to Logical Plan (Resolving UnresolvedCatalogRelations) — apply
Method
apply(
plan: LogicalPlan): LogicalPlan
Note
|
apply is part of Rule contract.
|
apply
resolves UnresolvedCatalogRelations for Spark (Data Source) and Hive tables:
-
apply
creates HiveTableRelation logical operators for UnresolvedCatalogRelations of Spark tables (incl. InsertIntoTable operators) -
apply
creates LogicalRelation logical operators for InsertIntoTable operators with UnresolvedCatalogRelation of a Hive table or UnresolvedCatalogRelations of a Hive table
Creating HiveTableRelation Logical Operator — readHiveTable
Internal Method
readHiveTable(
table: CatalogTable): LogicalPlan
readHiveTable
creates a HiveTableRelation for the input CatalogTable.
Note
|
readHiveTable is used when FindDataSourceTable is requested to resolve an UnresolvedCatalogRelation in a logical plan (for hive tables).
|
Creating LogicalRelation Logical Operator for CatalogTable — readDataSourceTable
Internal Method
readDataSourceTable(
table: CatalogTable): LogicalPlan
readDataSourceTable
requests the SparkSession for SessionCatalog.
readDataSourceTable
requests the SessionCatalog
for the cached logical plan for the input CatalogTable.
If not available, readDataSourceTable
creates a new DataSource for the provider (of the input CatalogTable
) with the extra path
option (based on the locationUri
of the storage of the input CatalogTable
). readDataSourceTable
requests the DataSource
to resolve the relation and create a corresponding BaseRelation that is then used to create a LogicalRelation with the input CatalogTable.
Note
|
readDataSourceTable is used when FindDataSourceTable is requested to resolve an UnresolvedCatalogRelation in a logical plan (for data source tables).
|