SparkOptimizer — Logical Query Plan Optimizer

SparkOptimizer is a concrete logical query plan optimizer with additional optimization rules (that extend the base logical optimization rules).

SparkOptimizer gives three extension points for additional optimization rules:

SparkOptimizer is created when SessionState is requested for the Logical Optimizer the first time (through BaseSessionStateBuilder).

spark sql SparkOptimizer.png
Figure 1. Creating SparkOptimizer

SparkOptimizer is available as the optimizer property of a session-specific SessionState.

scala> :type spark
org.apache.spark.sql.SparkSession

scala> :type spark.sessionState.optimizer
org.apache.spark.sql.catalyst.optimizer.Optimizer

// It is a SparkOptimizer really.
// Let's check that out with a type cast

import org.apache.spark.sql.execution.SparkOptimizer
scala> spark.sessionState.optimizer.isInstanceOf[SparkOptimizer]
res1: Boolean = true

You can access the optimization logical plan of a structured query through the QueryExecution as optimizedPlan.

// Applying two filter in sequence on purpose
// We want to kick CombineTypedFilters optimizer in
val dataset = spark.range(10).filter(_ % 2 == 0).filter(_ == 0)

// optimizedPlan is a lazy value
// Only at the first time you call it you will trigger optimizations
// Next calls end up with the cached already-optimized result
// Use explain to trigger optimizations again
scala> dataset.queryExecution.optimizedPlan
res0: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
TypedFilter <function1>, class java.lang.Long, [StructField(value,LongType,true)], newInstance(class java.lang.Long)
+- Range (0, 10, step=1, splits=Some(8))

SparkOptimizer defines the custom default rule batches.

Table 1. SparkOptimizer’s Default Optimization Batch Rules (in the order of execution)
Batch Name Strategy Rules Description

preOptimizationBatches

Base Logical Optimization Batches

Optimize Metadata Only Query

Once

OptimizeMetadataOnlyQuery

Extract Python UDF from Aggregate

Once

ExtractPythonUDFFromAggregate

Prune File Source Table Partitions

Once

PruneFileSourcePartitions

Push down operators to data source scan

Once

PushDownOperatorsToDataSource

Pushes down operators to underlying data sources (i.e. DataSourceV2Relations)

postHocOptimizationBatches

User Provided Optimizers

FixedPoint

extraOptimizations of the ExperimentalMethods

SparkOptimizer considers ExtractPythonUDFFromAggregate optimization rule as non-excludable.

Tip

Enable DEBUG or TRACE logging levels for org.apache.spark.sql.execution.SparkOptimizer logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.SparkOptimizer=TRACE

Refer to Logging.

Creating SparkOptimizer Instance

SparkOptimizer takes the following when created:

Extension Point for Additional Pre-Optimization Batches — preOptimizationBatches Method

preOptimizationBatches: Seq[Batch]

preOptimizationBatches are the additional pre-optimization batches that are executed right before the regular optimization batches.

Extension Point for Additional Post-Hoc Optimization Batches — postHocOptimizationBatches Method

postHocOptimizationBatches: Seq[Batch] = Nil

postHocOptimizationBatches are the additional post-optimization batches that are executed right after the regular optimization batches (before User Provided Optimizers).

results matching ""

    No results matching ""