InMemoryRelation Leaf Logical Operator — Cached Representation of Dataset

InMemoryRelation is a leaf logical operator that represents a cached Dataset by the physical query plan.

InMemoryRelation takes the following to be created:

Note
InMemoryRelation is usually created using apply factory methods.

InMemoryRelation is created when:

// Cache sample table range5 using pure SQL
// That registers range5 to contain the output of range(5) function
spark.sql("CACHE TABLE range5 AS SELECT * FROM range(5)")
val q1 = spark.sql("SELECT * FROM range5")
scala> q1.explain
== Physical Plan ==
InMemoryTableScan [id#0L]
   +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `range5`
         +- *Range (0, 5, step=1, splits=8)

// you could also use optimizedPlan to see InMemoryRelation
scala> println(q1.queryExecution.optimizedPlan.numberedTreeString)
00 InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `range5`
01    +- *Range (0, 5, step=1, splits=8)

// Use Dataset's cache
val q2 = spark.range(10).groupBy('id % 5).count.cache
scala> println(q2.queryExecution.optimizedPlan.numberedTreeString)
00 InMemoryRelation [(id % 5)#84L, count#83L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
01    +- *HashAggregate(keys=[(id#77L % 5)#88L], functions=[count(1)], output=[(id % 5)#84L, count#83L])
02       +- Exchange hashpartitioning((id#77L % 5)#88L, 200)
03          +- *HashAggregate(keys=[(id#77L % 5) AS (id#77L % 5)#88L], functions=[partial_count(1)], output=[(id#77L % 5)#88L, count#90L])
04             +- *Range (0, 10, step=1, splits=8)

InMemoryRelation is a MultiInstanceRelation so a new instance will be created to appear multiple times in a physical query plan.

// Cache a Dataset
val q = spark.range(10).cache

// Make sure that q Dataset is cached
val cache = spark.sharedState.cacheManager
scala> cache.lookupCachedData(q.queryExecution.logical).isDefined
res0: Boolean = true

scala> q.explain
== Physical Plan ==
InMemoryTableScan [id#122L]
   +- InMemoryRelation [id#122L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *Range (0, 10, step=1, splits=8)

val qCrossJoined = q.crossJoin(q)
scala> println(qCrossJoined.queryExecution.optimizedPlan.numberedTreeString)
00 Join Cross
01 :- InMemoryRelation [id#122L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
02 :     +- *Range (0, 10, step=1, splits=8)
03 +- InMemoryRelation [id#170L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
04       +- *Range (0, 10, step=1, splits=8)

// Use sameResult for comparison
// since the plans use different output attributes
// and have to be canonicalized internally
import org.apache.spark.sql.execution.columnar.InMemoryRelation
val optimizedPlan = qCrossJoined.queryExecution.optimizedPlan
scala> optimizedPlan.children(0).sameResult(optimizedPlan.children(1))
res1: Boolean = true

The simple text representation of a InMemoryRelation (aka simpleString) is InMemoryRelation [output], [storageLevel] (that uses the output and the CachedRDDBuilder).

val q = spark.range(1).cache
val logicalPlan = q.queryExecution.withCachedData
scala> println(logicalPlan.simpleString)
InMemoryRelation [id#40L], StorageLevel(disk, memory, deserialized, 1 replicas)

InMemoryRelation is resolved to InMemoryTableScanExec leaf physical operator when InMemoryScans execution planning strategy is executed.

Table 1. InMemoryRelation’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

partitionStatistics

PartitionStatistics for the output schema

Used exclusively when InMemoryTableScanExec is created (and initializes stats internal property).

Computing Statistics — computeStats Method

computeStats(): Statistics
Note
computeStats is part of LeafNode Contract to compute statistics for cost-based optimizer.

computeStats…​FIXME

withOutput Method

withOutput(newOutput: Seq[Attribute]): InMemoryRelation

withOutput…​FIXME

Note
withOutput is used exclusively when CacheManager is requested to replace logical query segments with cached query plans.

newInstance Method

newInstance(): this.type
Note
newInstance is part of MultiInstanceRelation Contract to…​FIXME.

newInstance…​FIXME

cachedColumnBuffers Method

cachedColumnBuffers: RDD[CachedBatch]

cachedColumnBuffers…​FIXME

Note
cachedColumnBuffers is used when…​FIXME

PartitionStatistics

PartitionStatistics(tableSchema: Seq[Attribute])
Note
PartitionStatistics is a private[columnar] class.

PartitionStatistics…​FIXME

Note
PartitionStatistics is used exclusively when InMemoryRelation is created (and initializes partitionStatistics).

Creating InMemoryRelation Instance — apply Factory Methods

apply(
  useCompression: Boolean,
  batchSize: Int,
  storageLevel: StorageLevel,
  child: SparkPlan,
  tableName: Option[String],
  logicalPlan: LogicalPlan): InMemoryRelation
apply(
  cacheBuilder: CachedRDDBuilder,
  logicalPlan: LogicalPlan): InMemoryRelation

apply creates an InMemoryRelation logical operator.

Note
apply is used when CacheManager is requested to cache and re-cache a structured query.

results matching ""

    No results matching ""