// Cache sample table range5 using pure SQL
// That registers range5 to contain the output of range(5) function
spark.sql("CACHE TABLE range5 AS SELECT * FROM range(5)")
val q1 = spark.sql("SELECT * FROM range5")
scala> q1.explain
== Physical Plan ==
InMemoryTableScan [id#0L]
+- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `range5`
+- *Range (0, 5, step=1, splits=8)
// you could also use optimizedPlan to see InMemoryRelation
scala> println(q1.queryExecution.optimizedPlan.numberedTreeString)
00 InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `range5`
01 +- *Range (0, 5, step=1, splits=8)
// Use Dataset's cache
val q2 = spark.range(10).groupBy('id % 5).count.cache
scala> println(q2.queryExecution.optimizedPlan.numberedTreeString)
00 InMemoryRelation [(id % 5)#84L, count#83L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
01 +- *HashAggregate(keys=[(id#77L % 5)#88L], functions=[count(1)], output=[(id % 5)#84L, count#83L])
02 +- Exchange hashpartitioning((id#77L % 5)#88L, 200)
03 +- *HashAggregate(keys=[(id#77L % 5) AS (id#77L % 5)#88L], functions=[partial_count(1)], output=[(id#77L % 5)#88L, count#90L])
04 +- *Range (0, 10, step=1, splits=8)
InMemoryRelation Leaf Logical Operator — Cached Representation of Dataset
InMemoryRelation is a leaf logical operator that represents a cached Dataset by the physical query plan.
InMemoryRelation takes the following to be created:
-
Output schema attributes
-
Statistics of the child query plan
-
Output orderings (
Seq[SortOrder])
|
Note
|
InMemoryRelation is usually created using apply factory methods.
|
InMemoryRelation is created when:
-
Dataset.persist operator is used (that in turn requests
CacheManagerto cache a structured query) -
CatalogImplis requested to cache or refresh a table or view in-memory -
InsertIntoDataSourceCommandlogical command is executed (and in turn requestsCacheManagerto recacheByPlan) -
CatalogImplis requested to refreshByPath (and in turn requestsCacheManagerto recacheByPath) -
QueryExecutionis requested for a cached logical query plan (and in turn requestsCacheManagerto replace logical query segments with cached query plans)
InMemoryRelation is a MultiInstanceRelation so a new instance will be created to appear multiple times in a physical query plan.
// Cache a Dataset
val q = spark.range(10).cache
// Make sure that q Dataset is cached
val cache = spark.sharedState.cacheManager
scala> cache.lookupCachedData(q.queryExecution.logical).isDefined
res0: Boolean = true
scala> q.explain
== Physical Plan ==
InMemoryTableScan [id#122L]
+- InMemoryRelation [id#122L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Range (0, 10, step=1, splits=8)
val qCrossJoined = q.crossJoin(q)
scala> println(qCrossJoined.queryExecution.optimizedPlan.numberedTreeString)
00 Join Cross
01 :- InMemoryRelation [id#122L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
02 : +- *Range (0, 10, step=1, splits=8)
03 +- InMemoryRelation [id#170L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
04 +- *Range (0, 10, step=1, splits=8)
// Use sameResult for comparison
// since the plans use different output attributes
// and have to be canonicalized internally
import org.apache.spark.sql.execution.columnar.InMemoryRelation
val optimizedPlan = qCrossJoined.queryExecution.optimizedPlan
scala> optimizedPlan.children(0).sameResult(optimizedPlan.children(1))
res1: Boolean = true
The simple text representation of a InMemoryRelation (aka simpleString) is InMemoryRelation [output], [storageLevel] (that uses the output and the CachedRDDBuilder).
val q = spark.range(1).cache
val logicalPlan = q.queryExecution.withCachedData
scala> println(logicalPlan.simpleString)
InMemoryRelation [id#40L], StorageLevel(disk, memory, deserialized, 1 replicas)
InMemoryRelation is resolved to InMemoryTableScanExec leaf physical operator when InMemoryScans execution planning strategy is executed.
| Name | Description |
|---|---|
|
PartitionStatistics for the output schema Used exclusively when |
Computing Statistics — computeStats Method
computeStats(): Statistics
|
Note
|
computeStats is part of LeafNode Contract to compute statistics for cost-based optimizer.
|
computeStats…FIXME
withOutput Method
withOutput(newOutput: Seq[Attribute]): InMemoryRelation
withOutput…FIXME
|
Note
|
withOutput is used exclusively when CacheManager is requested to replace logical query segments with cached query plans.
|
newInstance Method
newInstance(): this.type
|
Note
|
newInstance is part of MultiInstanceRelation Contract to…FIXME.
|
newInstance…FIXME
cachedColumnBuffers Method
cachedColumnBuffers: RDD[CachedBatch]
cachedColumnBuffers…FIXME
|
Note
|
cachedColumnBuffers is used when…FIXME
|
PartitionStatistics
PartitionStatistics(tableSchema: Seq[Attribute])
|
Note
|
PartitionStatistics is a private[columnar] class.
|
PartitionStatistics…FIXME
|
Note
|
PartitionStatistics is used exclusively when InMemoryRelation is created (and initializes partitionStatistics).
|
Creating InMemoryRelation Instance — apply Factory Methods
apply(
useCompression: Boolean,
batchSize: Int,
storageLevel: StorageLevel,
child: SparkPlan,
tableName: Option[String],
logicalPlan: LogicalPlan): InMemoryRelation
apply(
cacheBuilder: CachedRDDBuilder,
logicalPlan: LogicalPlan): InMemoryRelation
apply creates an InMemoryRelation logical operator.