InMemoryTableScanExec Leaf Physical Operator

InMemoryTableScanExec is a leaf physical operator to represent an InMemoryRelation logical operator at execution time.

InMemoryTableScanExec is a ColumnarBatchScan that supports batch decoding (when created for a DataSourceReader that supports it, i.e. the DataSourceReader is a SupportsScanColumnarBatch with the enableBatchRead flag enabled).

InMemoryTableScanExec supports partition batch pruning (only when spark.sql.inMemoryColumnarStorage.partitionPruning internal configuration property is enabled which is so by default).

InMemoryTableScanExec is created exclusively when InMemoryScans execution planning strategy is executed and finds an InMemoryRelation logical operator in a logical query plan.

// Sample DataFrames
val tokens = Seq(
  (0, "playing"),
  (1, "with"),
  (2, "InMemoryTableScanExec")
).toDF("id", "token")
val ids = spark.range(10)

// Cache DataFrames
tokens.cache
ids.cache

val q = tokens.join(ids, Seq("id"), "outer")
scala> q.explain
== Physical Plan ==
*Project [coalesce(cast(id#5 as bigint), id#10L) AS id#33L, token#6]
+- SortMergeJoin [cast(id#5 as bigint)], [id#10L], FullOuter
   :- *Sort [cast(id#5 as bigint) ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(cast(id#5 as bigint), 200)
   :     +- InMemoryTableScan [id#5, token#6]
   :           +- InMemoryRelation [id#5, token#6], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
   :                 +- LocalTableScan [id#5, token#6]
   +- *Sort [id#10L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#10L, 200)
         +- InMemoryTableScan [id#10L]
               +- InMemoryRelation [id#10L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                     +- *Range (0, 10, step=1, splits=8)
val q = spark.range(4).cache
val plan = q.queryExecution.executedPlan
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
val inmemoryScan = plan.collectFirst { case exec: InMemoryTableScanExec => exec }.get
assert(inmemoryScan.supportCodegen == inmemoryScan.supportsBatch)
Table 1. InMemoryTableScanExec’s Performance Metrics
Key Name (in web UI) Description

numOutputRows

number of output rows

spark sql InMemoryTableScanExec webui query details.png
Figure 1. InMemoryTableScanExec in web UI (Details for Query)

InMemoryTableScanExec supports Java code generation only if batch decoding is enabled.

InMemoryTableScanExec gives the single inputRDD as the only RDD of internal rows (when WholeStageCodegenExec physical operator is executed).

InMemoryTableScanExec uses spark.sql.inMemoryTableScanStatistics.enable flag (default: false) to enable accumulators (that seems to be exclusively for testing purposes).

Table 2. InMemoryTableScanExec’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

columnarBatchSchema

Schema of a columnar batch

Used exclusively when InMemoryTableScanExec is requested to createAndDecompressColumn.

stats

PartitionStatistics of the InMemoryRelation

Used when InMemoryTableScanExec is requested for partitionFilters, partition batch pruning and statsFor.

Creating InMemoryTableScanExec Instance

InMemoryTableScanExec takes the following when created:

InMemoryTableScanExec initializes the internal registries and counters.

vectorTypes Method

vectorTypes: Option[Seq[String]]
Note
vectorTypes is part of ColumnarBatchScan Contract to…​FIXME.

vectorTypes uses spark.sql.columnVector.offheap.enabled internal configuration property to select the name of the concrete column vector, i.e. OnHeapColumnVector or OffHeapColumnVector when the property is off or on, respectively.

vectorTypes gives as many column vectors as the attribute expressions.

supportsBatch Property

supportsBatch: Boolean
Note
supportsBatch is part of ColumnarBatchScan Contract to control whether the physical operator supports vectorized decoding or not.

supportsBatch is enabled when all of the following holds:

  1. spark.sql.inMemoryColumnarStorage.enableVectorizedReader configuration property is enabled

  2. The output schema of the InMemoryRelation uses primitive data types only, i.e. BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType

  3. The number of nested fields in the output schema of the InMemoryRelation is at most spark.sql.codegen.maxFields internal configuration property

partitionFilters Property

partitionFilters: Seq[Expression]
Note
partitionFilters is a Scala lazy value which is computed once when accessed and cached afterwards.

partitionFilters…​FIXME

Note
partitionFilters is used when…​FIXME

Applying Partition Batch Pruning to Cached Column Buffers (Creating MapPartitionsRDD of Filtered CachedBatches) — filteredCachedBatches Internal Method

filteredCachedBatches(): RDD[CachedBatch]

filteredCachedBatches requests PartitionStatistics for the output schema and InMemoryRelation for cached column buffers (as a RDD[CachedBatch]).

filteredCachedBatches takes the cached column buffers (as a RDD[CachedBatch]) and transforms the RDD per partition with index (i.e. RDD.mapPartitionsWithIndexInternal) as follows:

  1. Creates a partition filter as a new GenPredicate for the partitionFilters expressions (concatenated together using And binary operator and the schema)

  2. Requests the generated partition filter Predicate to initialize

  3. Uses spark.sql.inMemoryColumnarStorage.partitionPruning internal configuration property to enable partition batch pruning and filtering out (skipping) CachedBatches in a partition based on column stats and the generated partition filter Predicate

Note
If spark.sql.inMemoryColumnarStorage.partitionPruning internal configuration property is disabled (i.e. false), filteredCachedBatches does nothing and simply passes all CachedBatch elements along.
Note
spark.sql.inMemoryColumnarStorage.partitionPruning internal configuration property is enabled by default.
Note
filteredCachedBatches is used exclusively when InMemoryTableScanExec is requested for the inputRDD internal property.

statsFor Internal Method

statsFor(a: Attribute)

statsFor…​FIXME

Note
statsFor is used when…​FIXME

createAndDecompressColumn Internal Method

createAndDecompressColumn(cachedColumnarBatch: CachedBatch): ColumnarBatch

createAndDecompressColumn takes the number of rows in the input CachedBatch.

createAndDecompressColumn requests OffHeapColumnVector or OnHeapColumnVector to allocate column vectors (with the number of rows and columnarBatchSchema) per the spark.sql.columnVector.offheap.enabled internal configuration flag, i.e. true or false, respectively.

Note
spark.sql.columnVector.offheap.enabled internal configuration flag is disabled by default which means that OnHeapColumnVector is used.

createAndDecompressColumn creates a ColumnarBatch for the allocated column vectors (as an array of ColumnVector).

For every Attribute createAndDecompressColumn requests ColumnAccessor to decompress the column.

createAndDecompressColumn registers a callback to be executed on a task completion that will close the ColumnarBatch.

In the end, createAndDecompressColumn returns the ColumnarBatch.

Note
createAndDecompressColumn is used exclusively when InMemoryTableScanExec is requested for the input RDD of internal rows.

Creating Input RDD of Internal Rows — inputRDD Internal Property

inputRDD: RDD[InternalRow]
Note
inputRDD is a Scala lazy value which is computed once when accessed and cached afterwards.

inputRDD firstly applies partition batch pruning to cached column buffers (and creates a filtered cached batches as a RDD[CachedBatch]).

With supportsBatch flag on, inputRDD finishes with a new MapPartitionsRDD (using RDD.map) by createAndDecompressColumn on all cached columnar batches.

Caution
Show examples of supportsBatch enabled and disabled
// Demo: A MapPartitionsRDD in the RDD lineage
val q = spark.range(4).cache
val plan = q.queryExecution.executedPlan
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
val inmemoryScan = plan.collectFirst { case exec: InMemoryTableScanExec => exec }.get

// supportsBatch flag is on since the schema is a single column of longs
assert(inmemoryScan.supportsBatch)

val rdd = inmemoryScan.inputRDDs.head
scala> rdd.toDebugString
res2: String =
(8) MapPartitionsRDD[5] at inputRDDs at <console>:27 []
 |  MapPartitionsRDD[4] at inputRDDs at <console>:27 []
 |  *(1) Range (0, 4, step=1, splits=8)
 MapPartitionsRDD[3] at cache at <console>:23 []
 |  MapPartitionsRDD[2] at cache at <console>:23 []
 |  MapPartitionsRDD[1] at cache at <console>:23 []
 |  ParallelCollectionRDD[0] at cache at <console>:23 []

With supportsBatch flag off, inputRDD firstly applies partition batch pruning to cached column buffers (and creates a filtered cached batches as a RDD[CachedBatch]).

Note
Indeed. inputRDD applies partition batch pruning to cached column buffers (and creates a filtered cached batches as a RDD[CachedBatch]) twice which seems unnecessary.

In the end, inputRDD creates a new MapPartitionsRDD (using RDD.map) with a ColumnarIterator applied to all cached columnar batches that is created as follows:

  1. For every CachedBatch in the partition iterator adds the total number of rows in the batch to numOutputRows SQL metric

  2. Requests GenerateColumnAccessor to generate the Java code for a ColumnarIterator to perform expression evaluation for the given column types.

  3. Requests ColumnarIterator to initialize

// Demo: A MapPartitionsRDD in the RDD lineage (supportsBatch flag off)
import java.sql.Date
import java.time.LocalDate
val q = Seq(Date.valueOf(LocalDate.now)).toDF("date").cache
val plan = q.queryExecution.executedPlan

import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
val inmemoryScan = plan.collectFirst { case exec: InMemoryTableScanExec => exec }.get

// supportsBatch flag is off since the schema uses java.sql.Date
assert(inmemoryScan.supportsBatch == false)

val rdd = inmemoryScan.inputRDDs.head
scala> rdd.toDebugString
res2: String =
(1) MapPartitionsRDD[12] at inputRDDs at <console>:28 []
 |  MapPartitionsRDD[11] at inputRDDs at <console>:28 []
 |  LocalTableScan [date#15]
 MapPartitionsRDD[9] at cache at <console>:25 []
 |  MapPartitionsRDD[8] at cache at <console>:25 []
 |  ParallelCollectionRDD[7] at cache at <console>:25 []
Note
inputRDD is used when InMemoryTableScanExec is requested for the input RDDs and to execute.

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

doExecute(): RDD[InternalRow]
Note
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

doExecute branches off per supportsBatch flag.

With supportsBatch flag on, doExecute creates a WholeStageCodegenExec (with the InMemoryTableScanExec physical operator as the child and codegenStageId as 0) and requests it to execute.

Otherwise, when supportsBatch flag is off, doExecute simply gives the input RDD of internal rows.

buildFilter Property

buildFilter: PartialFunction[Expression, Expression]
Note
buildFilter is a Scala lazy value which is computed once when accessed and cached afterwards.

buildFilter is a Scala PartialFunction that accepts an Expression and produces an Expression, i.e. PartialFunction[Expression, Expression].

Table 3. buildFilter’s Expressions
Input Expression Description

And

Or

EqualTo

EqualNullSafe

LessThan

LessThanOrEqual

GreaterThan

GreaterThanOrEqual

IsNull

IsNotNull

In with a non-empty list of Literal expressions

For every Literal expression in the expression list, buildFilter creates an And expression with the lower and upper bounds of the partition statistics for the attribute and the Literal.

In the end, buildFilter joins the And expressions with Or expressions.

Note
buildFilter is used exclusively when InMemoryTableScanExec is requested for partitionFilters.

innerChildren Method

innerChildren: Seq[QueryPlan[_]]
Note
innerChildren is part of QueryPlan Contract to…​FIXME.

innerChildren…​FIXME

results matching ""

    No results matching ""