InMemoryRelation Leaf Logical Operator For Cached Physical Query Plans

InMemoryRelation is a leaf logical operator that represents a cached child physical query plan.

InMemoryRelation is created when:

// Cache sample table range5 using pure SQL
// That registers range5 to contain the output of range(5) function
spark.sql("CACHE TABLE range5 AS SELECT * FROM range(5)")
val q1 = spark.sql("SELECT * FROM range5")
scala> q1.explain
== Physical Plan ==
InMemoryTableScan [id#0L]
   +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `range5`
         +- *Range (0, 5, step=1, splits=8)

// you could also use optimizedPlan to see InMemoryRelation
scala> println(q1.queryExecution.optimizedPlan.numberedTreeString)
00 InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `range5`
01    +- *Range (0, 5, step=1, splits=8)

// Use Dataset's cache
val q2 = spark.range(10).groupBy('id % 5).count.cache
scala> println(q2.queryExecution.optimizedPlan.numberedTreeString)
00 InMemoryRelation [(id % 5)#84L, count#83L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
01    +- *HashAggregate(keys=[(id#77L % 5)#88L], functions=[count(1)], output=[(id % 5)#84L, count#83L])
02       +- Exchange hashpartitioning((id#77L % 5)#88L, 200)
03          +- *HashAggregate(keys=[(id#77L % 5) AS (id#77L % 5)#88L], functions=[partial_count(1)], output=[(id#77L % 5)#88L, count#90L])
04             +- *Range (0, 10, step=1, splits=8)

InMemoryRelation is a MultiInstanceRelation so a new instance will be created to appear multiple times in a physical query plan.

// Cache a Dataset
val q = spark.range(10).cache

// Make sure that q Dataset is cached
val cache = spark.sharedState.cacheManager
scala> cache.lookupCachedData(q.queryExecution.logical).isDefined
res0: Boolean = true

scala> q.explain
== Physical Plan ==
InMemoryTableScan [id#122L]
   +- InMemoryRelation [id#122L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *Range (0, 10, step=1, splits=8)

val qCrossJoined = q.crossJoin(q)
scala> println(qCrossJoined.queryExecution.optimizedPlan.numberedTreeString)
00 Join Cross
01 :- InMemoryRelation [id#122L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
02 :     +- *Range (0, 10, step=1, splits=8)
03 +- InMemoryRelation [id#170L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
04       +- *Range (0, 10, step=1, splits=8)

// Use sameResult for comparison
// since the plans use different output attributes
// and have to be canonicalized internally
import org.apache.spark.sql.execution.columnar.InMemoryRelation
val optimizedPlan = qCrossJoined.queryExecution.optimizedPlan
scala> optimizedPlan.children(0).sameResult(optimizedPlan.children(1))
res1: Boolean = true

InMemoryRelation is created using apply factory method that accepts no output attributes and so uses the output of the child physical plan instead.

  useCompression: Boolean,
  batchSize: Int,
  storageLevel: StorageLevel,
  child: SparkPlan,
  tableName: Option[String]): InMemoryRelation
Table 1. InMemoryRelation’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description


PartitionStatistics for the output schema

Used exclusively when InMemoryTableScanExec is created (and initializes stats internal property).

Computing Statistics — computeStats Method

computeStats(): Statistics
computeStats is part of LeafNode Contract to compute statistics for cost-based optimizer.


Creating InMemoryRelation Instance

InMemoryRelation takes the following when created:

  • Output schema attributes

  • useCompression flag

  • Batch size

  • Storage level

  • Child physical query plan

  • Table name (if used)

  • Cached column buffers (as RDD[CachedBatch])

  • Size in bytes statistic (as LongAccumulator)

  • Statistics of the child query plan

withOutput Method

withOutput(newOutput: Seq[Attribute]): InMemoryRelation


withOutput is used exclusively when CacheManager is requested to replace logical query segments with cached query plans.

newInstance Method

newInstance(): this.type
newInstance is part of MultiInstanceRelation Contract to…​FIXME.


cachedColumnBuffers Method

cachedColumnBuffers: RDD[CachedBatch]


cachedColumnBuffers is used when…​FIXME


PartitionStatistics(tableSchema: Seq[Attribute])
PartitionStatistics is a private[columnar] class.


PartitionStatistics is used exclusively when InMemoryRelation is created (and initializes partitionStatistics).

results matching ""

    No results matching ""