InMemoryRelation Leaf Logical Operator — Cached Representation of Dataset

InMemoryRelation is a leaf logical operator that represents a cached Dataset by the physical query plan.

InMemoryRelation takes the following to be created:

InMemoryRelation is usually created using apply factory methods.

InMemoryRelation is created when:

// Cache sample table range5 using pure SQL
// That registers range5 to contain the output of range(5) function
spark.sql("CACHE TABLE range5 AS SELECT * FROM range(5)")
val q1 = spark.sql("SELECT * FROM range5")
scala> q1.explain
== Physical Plan ==
InMemoryTableScan [id#0L]
   +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `range5`
         +- *Range (0, 5, step=1, splits=8)

// you could also use optimizedPlan to see InMemoryRelation
scala> println(q1.queryExecution.optimizedPlan.numberedTreeString)
00 InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `range5`
01    +- *Range (0, 5, step=1, splits=8)

// Use Dataset's cache
val q2 = spark.range(10).groupBy('id % 5).count.cache
scala> println(q2.queryExecution.optimizedPlan.numberedTreeString)
00 InMemoryRelation [(id % 5)#84L, count#83L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
01    +- *HashAggregate(keys=[(id#77L % 5)#88L], functions=[count(1)], output=[(id % 5)#84L, count#83L])
02       +- Exchange hashpartitioning((id#77L % 5)#88L, 200)
03          +- *HashAggregate(keys=[(id#77L % 5) AS (id#77L % 5)#88L], functions=[partial_count(1)], output=[(id#77L % 5)#88L, count#90L])
04             +- *Range (0, 10, step=1, splits=8)

InMemoryRelation is a MultiInstanceRelation so a new instance will be created to appear multiple times in a physical query plan.

// Cache a Dataset
val q = spark.range(10).cache

// Make sure that q Dataset is cached
val cache = spark.sharedState.cacheManager
scala> cache.lookupCachedData(q.queryExecution.logical).isDefined
res0: Boolean = true

scala> q.explain
== Physical Plan ==
InMemoryTableScan [id#122L]
   +- InMemoryRelation [id#122L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *Range (0, 10, step=1, splits=8)

val qCrossJoined = q.crossJoin(q)
scala> println(qCrossJoined.queryExecution.optimizedPlan.numberedTreeString)
00 Join Cross
01 :- InMemoryRelation [id#122L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
02 :     +- *Range (0, 10, step=1, splits=8)
03 +- InMemoryRelation [id#170L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
04       +- *Range (0, 10, step=1, splits=8)

// Use sameResult for comparison
// since the plans use different output attributes
// and have to be canonicalized internally
import org.apache.spark.sql.execution.columnar.InMemoryRelation
val optimizedPlan = qCrossJoined.queryExecution.optimizedPlan
scala> optimizedPlan.children(0).sameResult(optimizedPlan.children(1))
res1: Boolean = true

The simple text representation of a InMemoryRelation (aka simpleString) is InMemoryRelation [output], [storageLevel] (that uses the output and the CachedRDDBuilder).

val q = spark.range(1).cache
val logicalPlan = q.queryExecution.withCachedData
scala> println(logicalPlan.simpleString)
InMemoryRelation [id#40L], StorageLevel(disk, memory, deserialized, 1 replicas)

InMemoryRelation is resolved to InMemoryTableScanExec leaf physical operator when InMemoryScans execution planning strategy is executed.

Table 1. InMemoryRelation’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description


PartitionStatistics for the output schema

Used exclusively when InMemoryTableScanExec is created (and initializes stats internal property).

Computing Statistics — computeStats Method

computeStats(): Statistics
computeStats is part of LeafNode Contract to compute statistics for cost-based optimizer.


withOutput Method

withOutput(newOutput: Seq[Attribute]): InMemoryRelation


withOutput is used exclusively when CacheManager is requested to replace logical query segments with cached query plans.

newInstance Method

newInstance(): this.type
newInstance is part of MultiInstanceRelation Contract to…​FIXME.


cachedColumnBuffers Method

cachedColumnBuffers: RDD[CachedBatch]


cachedColumnBuffers is used when…​FIXME


PartitionStatistics(tableSchema: Seq[Attribute])
PartitionStatistics is a private[columnar] class.


PartitionStatistics is used exclusively when InMemoryRelation is created (and initializes partitionStatistics).

Creating InMemoryRelation Instance — apply Factory Methods

  useCompression: Boolean,
  batchSize: Int,
  storageLevel: StorageLevel,
  child: SparkPlan,
  tableName: Option[String],
  logicalPlan: LogicalPlan): InMemoryRelation
  cacheBuilder: CachedRDDBuilder,
  logicalPlan: LogicalPlan): InMemoryRelation

apply creates an InMemoryRelation logical operator.

apply is used when CacheManager is requested to cache and re-cache a structured query.

results matching ""

    No results matching ""