PushDownPredicate — Predicate Pushdown / Filter Pushdown Logical Optimization

PushDownPredicate is part of the Operator Optimization before Inferring Filters fixed-point batch in the standard batches of the Catalyst Optimizer.

PushDownPredicate is simply a Catalyst rule for transforming logical plans, i.e. Rule[LogicalPlan].

When you execute where or filter operators right after loading a dataset, Spark SQL will try to push the where/filter predicate down to the data source using a corresponding SQL query with WHERE clause (or whatever the proper language for the data source is).

This optimization is called filter pushdown or predicate pushdown and aims at pushing down the filtering to the "bare metal", i.e. a data source engine. That is to increase the performance of queries since the filtering is performed at the very low level rather than dealing with the entire dataset after it has been loaded to Spark’s memory and perhaps causing memory issues.

PushDownPredicate is also applied to structured queries with filters after projections or filtering on window partitions.

Pushing Filter Operator Down Using Projection

val dataset = spark.range(2)

scala> dataset.select('id as "_id").filter('_id === 0).explain(extended = true)
...
TRACE SparkOptimizer:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicate ===
!Filter (_id#14L = cast(0 as bigint))         Project [id#11L AS _id#14L]
!+- Project [id#11L AS _id#14L]               +- Filter (id#11L = cast(0 as bigint))
    +- Range (0, 2, step=1, splits=Some(8))      +- Range (0, 2, step=1, splits=Some(8))
...
== Parsed Logical Plan ==
'Filter ('_id = 0)
+- Project [id#11L AS _id#14L]
   +- Range (0, 2, step=1, splits=Some(8))

== Analyzed Logical Plan ==
_id: bigint
Filter (_id#14L = cast(0 as bigint))
+- Project [id#11L AS _id#14L]
   +- Range (0, 2, step=1, splits=Some(8))

== Optimized Logical Plan ==
Project [id#11L AS _id#14L]
+- Filter (id#11L = 0)
   +- Range (0, 2, step=1, splits=Some(8))

== Physical Plan ==
*Project [id#11L AS _id#14L]
+- *Filter (id#11L = 0)
   +- *Range (0, 2, step=1, splits=Some(8))

Optimizing Window Aggregate Operators

val dataset = spark.range(5).withColumn("group", 'id % 3)
scala> dataset.show
+---+-----+
| id|group|
+---+-----+
|  0|    0|
|  1|    1|
|  2|    2|
|  3|    0|
|  4|    1|
+---+-----+

import org.apache.spark.sql.expressions.Window
val groupW = Window.partitionBy('group).orderBy('id)

// Filter out group 2 after window
// No need to compute rank for group 2
// Push the filter down
val ranked = dataset.withColumn("rank", rank over groupW).filter('group !== 2)

scala> ranked.queryExecution.optimizedPlan
...
TRACE SparkOptimizer:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicate ===
!Filter NOT (group#35L = cast(2 as bigint))                                                                                                                            Project [id#32L, group#35L, rank#203]
!+- Project [id#32L, group#35L, rank#203]                                                                                                                              +- Project [id#32L, group#35L, rank#203, rank#203]
!   +- Project [id#32L, group#35L, rank#203, rank#203]                                                                                                                    +- Window [rank(id#32L) windowspecdefinition(group#35L, id#32L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#203], [group#35L], [id#32L ASC]
!      +- Window [rank(id#32L) windowspecdefinition(group#35L, id#32L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#203], [group#35L], [id#32L ASC]         +- Project [id#32L, group#35L]
!         +- Project [id#32L, group#35L]                                                                                                                                        +- Project [id#32L, (id#32L % cast(3 as bigint)) AS group#35L]
!            +- Project [id#32L, (id#32L % cast(3 as bigint)) AS group#35L]                                                                                                        +- Filter NOT ((id#32L % cast(3 as bigint)) = cast(2 as bigint))
                +- Range (0, 5, step=1, splits=Some(8))                                                                                                                               +- Range (0, 5, step=1, splits=Some(8))
...
res1: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Window [rank(id#32L) windowspecdefinition(group#35L, id#32L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#203], [group#35L], [id#32L ASC]
+- Project [id#32L, (id#32L % 3) AS group#35L]
   +- Filter NOT ((id#32L % 3) = 2)
      +- Range (0, 5, step=1, splits=Some(8))

JDBC Data Source

Tip
Follow the instructions on how to set up PostgreSQL in Creating DataFrames from Tables using JDBC and PostgreSQL.

Given the following code:

// Start with the PostgreSQL driver on CLASSPATH

case class Project(id: Long, name: String, website: String)

// No optimizations for typed queries
// LOG:  execute <unnamed>: SELECT "id","name","website" FROM projects
val df = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql:sparkdb")
  .option("dbtable", "projects")
  .load()
  .as[Project]
  .filter(_.name.contains("Spark"))

// Only the following would end up with the pushdown
val df = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql:sparkdb")
  .option("dbtable", "projects")
  .load()
  .where("""name like "%Spark%"""")

PushDownPredicate translates the above query to the following SQL query:

LOG:  execute <unnamed>: SELECT "id","name","website" FROM projects WHERE (name LIKE '%Spark%')
Tip

Enable all logs in PostgreSQL to see the above SELECT and other query statements.

log_statement = 'all'

Add log_statement = 'all' to /usr/local/var/postgres/postgresql.conf on Mac OS X with PostgreSQL installed using brew.

Parquet Data Source

val spark: SparkSession = ...
import spark.implicits._

// paste it to REPL individually to make the following line work
case class City(id: Long, name: String)

import org.apache.spark.sql.SaveMode.Overwrite
Seq(
  City(0, "Warsaw"),
  City(1, "Toronto"),
  City(2, "London"),
  City(3, "Redmond"),
  City(4, "Boston")).toDF.write.mode(Overwrite).parquet("cities.parquet")

val cities = spark.read.parquet("cities.parquet").as[City]

// Using DataFrame's Column-based query
scala> cities.where('name === "Warsaw").queryExecution.executedPlan
res21: org.apache.spark.sql.execution.SparkPlan =
*Project [id#128L, name#129]
+- *Filter (isnotnull(name#129) && (name#129 = Warsaw))
   +- *FileScan parquet [id#128L,name#129] Batched: true, Format: ParquetFormat, InputPaths: file:/Users/jacek/dev/oss/spark/cities.parquet, PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,Warsaw)], ReadSchema: struct<id:bigint,name:string>

// Using SQL query
scala> cities.where("""name = "Warsaw"""").queryExecution.executedPlan
res23: org.apache.spark.sql.execution.SparkPlan =
*Project [id#128L, name#129]
+- *Filter (isnotnull(name#129) && (name#129 = Warsaw))
   +- *FileScan parquet [id#128L,name#129] Batched: true, Format: ParquetFormat, InputPaths: file:/Users/jacek/dev/oss/spark/cities.parquet, PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,Warsaw)], ReadSchema: struct<id:bigint,name:string>

// Using Dataset's strongly type-safe filter
// Why does the following not push the filter down?
scala> cities.filter(_.name == "Warsaw").queryExecution.executedPlan
res24: org.apache.spark.sql.execution.SparkPlan =
*Filter <function1>.apply
+- *FileScan parquet [id#128L,name#129] Batched: true, Format: ParquetFormat, InputPaths: file:/Users/jacek/dev/oss/spark/cities.parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,name:string>

Hive Data Source

Caution
FIXME

Executing Rule — apply Method

apply(plan: LogicalPlan): LogicalPlan
Note
apply is part of the Rule Contract to execute (apply) a rule on a TreeNode (e.g. LogicalPlan).

apply…​FIXME

results matching ""

    No results matching ""