InMemoryRelation Leaf Logical Operator For Cached Physical Query Plans

InMemoryRelation is a leaf logical operator that represents a cached child physical query plan.

InMemoryRelation is created when:

// Cache sample table range5 using pure SQL
// That registers range5 to contain the output of range(5) function
spark.sql("CACHE TABLE range5 AS SELECT * FROM range(5)")
val q1 = spark.sql("SELECT * FROM range5")
scala> q1.explain
== Physical Plan ==
InMemoryTableScan [id#0L]
   +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `range5`
         +- *Range (0, 5, step=1, splits=8)

// you could also use optimizedPlan to see InMemoryRelation
scala> println(q1.queryExecution.optimizedPlan.numberedTreeString)
00 InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `range5`
01    +- *Range (0, 5, step=1, splits=8)

// Use Dataset's cache
val q2 = spark.range(10).groupBy('id % 5).count.cache
scala> println(q2.queryExecution.optimizedPlan.numberedTreeString)
00 InMemoryRelation [(id % 5)#84L, count#83L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
01    +- *HashAggregate(keys=[(id#77L % 5)#88L], functions=[count(1)], output=[(id % 5)#84L, count#83L])
02       +- Exchange hashpartitioning((id#77L % 5)#88L, 200)
03          +- *HashAggregate(keys=[(id#77L % 5) AS (id#77L % 5)#88L], functions=[partial_count(1)], output=[(id#77L % 5)#88L, count#90L])
04             +- *Range (0, 10, step=1, splits=8)

InMemoryRelation is a MultiInstanceRelation so a new instance will be created to appear multiple times in a physical query plan.

// Cache a Dataset
val q = spark.range(10).cache

// Make sure that q Dataset is cached
val cache = spark.sharedState.cacheManager
scala> cache.lookupCachedData(q.queryExecution.logical).isDefined
res0: Boolean = true

scala> q.explain
== Physical Plan ==
InMemoryTableScan [id#122L]
   +- InMemoryRelation [id#122L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *Range (0, 10, step=1, splits=8)

val qCrossJoined = q.crossJoin(q)
scala> println(qCrossJoined.queryExecution.optimizedPlan.numberedTreeString)
00 Join Cross
01 :- InMemoryRelation [id#122L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
02 :     +- *Range (0, 10, step=1, splits=8)
03 +- InMemoryRelation [id#170L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
04       +- *Range (0, 10, step=1, splits=8)

// Use sameResult for comparison
// since the plans use different output attributes
// and have to be canonicalized internally
import org.apache.spark.sql.execution.columnar.InMemoryRelation
val optimizedPlan = qCrossJoined.queryExecution.optimizedPlan
scala> optimizedPlan.children(0).sameResult(optimizedPlan.children(1))
res1: Boolean = true
Note

InMemoryRelation is created using apply factory method that accepts no output attributes and so uses the output of the child physical plan instead.

apply(
  useCompression: Boolean,
  batchSize: Int,
  storageLevel: StorageLevel,
  child: SparkPlan,
  tableName: Option[String]): InMemoryRelation
Table 1. InMemoryRelation’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

partitionStatistics

PartitionStatistics for the output schema

Used exclusively when InMemoryTableScanExec is created (and initializes stats internal property).

Computing Statistics — computeStats Method

computeStats(): Statistics
Note
computeStats is part of LeafNode Contract to compute statistics for cost-based optimizer.

computeStats…​FIXME

Creating InMemoryRelation Instance

InMemoryRelation takes the following when created:

  • Output schema attributes

  • useCompression flag

  • Batch size

  • Storage level

  • Child physical query plan

  • Table name (if used)

  • Cached column buffers (as RDD[CachedBatch])

  • Size in bytes statistic (as LongAccumulator)

  • Statistics of the child query plan

withOutput Method

withOutput(newOutput: Seq[Attribute]): InMemoryRelation

withOutput…​FIXME

Note
withOutput is used exclusively when CacheManager is requested to replace logical query segments with cached query plans.

newInstance Method

newInstance(): this.type
Note
newInstance is part of MultiInstanceRelation Contract to…​FIXME.

newInstance…​FIXME

cachedColumnBuffers Method

cachedColumnBuffers: RDD[CachedBatch]

cachedColumnBuffers…​FIXME

Note
cachedColumnBuffers is used when…​FIXME

PartitionStatistics

PartitionStatistics(tableSchema: Seq[Attribute])
Note
PartitionStatistics is a private[columnar] class.

PartitionStatistics…​FIXME

Note
PartitionStatistics is used exclusively when InMemoryRelation is created (and initializes partitionStatistics).

results matching ""

    No results matching ""