Logical Query Plan Optimizer

Catalyst is a Spark SQL framework for manipulating trees. It can work with trees of relational operators and expressions in logical plans before they end up as physical execution plans.
scala> sql("select 1 + 1 + 1").explain(true)
== Parsed Logical Plan ==
'Project [unresolvedalias(((1 + 1) + 1), None)]
+- OneRowRelation$

== Analyzed Logical Plan ==
((1 + 1) + 1): int
Project [((1 + 1) + 1) AS ((1 + 1) + 1)#4]
+- OneRowRelation$

== Optimized Logical Plan ==
Project [3 AS ((1 + 1) + 1)#4]
+- OneRowRelation$

== Physical Plan ==
*Project [3 AS ((1 + 1) + 1)#4]
+- Scan OneRowRelation[]

Spark 2.0 uses Catalyst’s tree manipulation library to build an extensible query plan optimizer with a number of query optimizations.

Catalyst supports both rule-based and cost-based optimization.

Collection of Logical Plan Optimizations — batches Method

batches: Seq[Batch]

batches returns a collection of logical plan optimization batches.

Table 1. (A subset of) Catalyst Plan Optimizer’s Logical Plan Optimizations
Name Description

Constant Folding

Predicate Pushdown

Nullability (NULL Value) Propagation

Vectorized Parquet Decoder

Combine Typed Filters

Propagate Empty Relation

Simplify Casts

Column Pruning

GetCurrentDatabase / ComputeCurrentTime

Eliminate Serialization

SparkOptimizer — The Default Logical Query Plan Optimizer

SparkOptimizer is the default logical query plan optimizer that is available as optimizer attribute of SessionState with the logical plan optimizations.

SparkOptimizer is merely used to compute the optimized LogicalPlan for a QueryExecution (available as optimizedPlan).

SparkOptimizer requires a SessionCatalog, a SQLConf and ExperimentalMethods with user-defined experimental methods.

SparkOptimizer's input experimentalMethods serves an extension point for custom ExperimentalMethods.

SparkOptimizer extends the Optimizer batches with the following batches:

  1. Optimize Metadata Only Query (as OptimizeMetadataOnlyQuery)

  2. Extract Python UDF from Aggregate (as ExtractPythonUDFFromAggregate)

  3. Prune File Source Table Partitions (as PruneFileSourcePartitions)

  4. User Provided Optimizers for the input user-defined ExperimentalMethods

You can see the result of executing SparkOptimizer on a query plan using optimizedPlan attribute of QueryExecution.

// Applying two filter in sequence on purpose
// We want to kick CombineTypedFilters optimizer in
val dataset = spark.range(10).filter(_ % 2 == 0).filter(_ == 0)

// optimizedPlan is a lazy value
// Only at the first time you call it you will trigger optimizations
// Next calls end up with the cached already-optimized result
// Use explain to trigger optimizations again
scala> dataset.queryExecution.optimizedPlan
res0: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
TypedFilter <function1>, class java.lang.Long, [StructField(value,LongType,true)], newInstance(class java.lang.Long)
+- Range (0, 10, step=1, splits=Some(8))

Enable DEBUG or TRACE logging levels for org.apache.spark.sql.execution.SparkOptimizer logger to see what happens inside.

Add the following line to conf/log4j.properties:


Refer to Logging.

results matching ""

    No results matching ""