ColumnPruning Logical Optimization

ColumnPruning is a base logical optimization that FIXME.

ColumnPruning is part of the RewriteSubquery once-executed batch in the standard batches of the Catalyst Optimizer.

ColumnPruning is simply a Catalyst rule for transforming logical plans, i.e. Rule[LogicalPlan].

Example 1

val dataset = spark.range(10).withColumn("bucket", 'id % 3)

import org.apache.spark.sql.expressions.Window
val rankCol = rank over Window.partitionBy('bucket).orderBy('id) as "rank"

val ranked = dataset.withColumn("rank", rankCol)

scala> ranked.explain(true)
...
TRACE SparkOptimizer:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.ColumnPruning ===
 Project [id#73L, bucket#76L, rank#192]                                                                                                                              Project [id#73L, bucket#76L, rank#192]
!+- Project [id#73L, bucket#76L, rank#82, rank#82 AS rank#192]                                                                                                       +- Project [id#73L, bucket#76L, rank#82 AS rank#192]
    +- Window [rank(id#73L) windowspecdefinition(bucket#76L, id#73L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#82], [bucket#76L], [id#73L ASC]      +- Window [rank(id#73L) windowspecdefinition(bucket#76L, id#73L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#82], [bucket#76L], [id#73L ASC]
!      +- Project [id#73L, bucket#76L]                                                                                                                                     +- Project [id#73L, (id#73L % cast(3 as bigint)) AS bucket#76L]
!         +- Project [id#73L, (id#73L % cast(3 as bigint)) AS bucket#76L]                                                                                                     +- Range (0, 10, step=1, splits=Some(8))
!            +- Range (0, 10, step=1, splits=Some(8))
...
TRACE SparkOptimizer: Fixed point reached for batch Operator Optimizations after 2 iterations.
DEBUG SparkOptimizer:
=== Result of Batch Operator Optimizations ===
!Project [id#73L, bucket#76L, rank#192]                                                                                                                              Window [rank(id#73L) windowspecdefinition(bucket#76L, id#73L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#82], [bucket#76L], [id#73L ASC]
!+- Project [id#73L, bucket#76L, rank#82, rank#82 AS rank#192]                                                                                                       +- Project [id#73L, (id#73L % 3) AS bucket#76L]
!   +- Window [rank(id#73L) windowspecdefinition(bucket#76L, id#73L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#82], [bucket#76L], [id#73L ASC]      +- Range (0, 10, step=1, splits=Some(8))
!      +- Project [id#73L, bucket#76L]
!         +- Project [id#73L, (id#73L % cast(3 as bigint)) AS bucket#76L]
!            +- Range (0, 10, step=1, splits=Some(8))
...

Example 2

// the business object
case class Person(id: Long, name: String, city: String)

// the dataset to query over
val dataset = Seq(Person(0, "Jacek", "Warsaw")).toDS

// the query
// Note that we work with names only (out of 3 attributes in Person)
val query = dataset.groupBy(upper('name) as 'name).count

scala> query.explain(extended = true)
...
TRACE SparkOptimizer:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.ColumnPruning ===
 Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L]   Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L]
!+- LocalRelation [id#125L, name#126, city#127]                                       +- Project [name#126]
!                                                                                        +- LocalRelation [id#125L, name#126, city#127]
...
== Parsed Logical Plan ==
'Aggregate [upper('name) AS name#160], [upper('name) AS name#160, count(1) AS count#166L]
+- LocalRelation [id#125L, name#126, city#127]

== Analyzed Logical Plan ==
name: string, count: bigint
Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L]
+- LocalRelation [id#125L, name#126, city#127]

== Optimized Logical Plan ==
Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L]
+- LocalRelation [name#126]

== Physical Plan ==
*HashAggregate(keys=[upper(name#126)#171], functions=[count(1)], output=[name#160, count#166L])
+- Exchange hashpartitioning(upper(name#126)#171, 200)
   +- *HashAggregate(keys=[upper(name#126) AS upper(name#126)#171], functions=[partial_count(1)], output=[upper(name#126)#171, count#173L])
      +- LocalTableScan [name#126]

Executing Rule — apply Method

apply(plan: LogicalPlan): LogicalPlan
Note
apply is part of the Rule Contract to execute (apply) a rule on a TreeNode (e.g. LogicalPlan).

apply…​FIXME

results matching ""

    No results matching ""