val dataset = spark.range(2)
scala> dataset.select('id as "_id").filter('_id === 0).explain(extended = true)
...
TRACE SparkOptimizer:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicate ===
!Filter (_id#14L = cast(0 as bigint)) Project [id#11L AS _id#14L]
!+- Project [id#11L AS _id#14L] +- Filter (id#11L = cast(0 as bigint))
+- Range (0, 2, step=1, splits=Some(8)) +- Range (0, 2, step=1, splits=Some(8))
...
== Parsed Logical Plan ==
'Filter ('_id = 0)
+- Project [id#11L AS _id#14L]
+- Range (0, 2, step=1, splits=Some(8))
== Analyzed Logical Plan ==
_id: bigint
Filter (_id#14L = cast(0 as bigint))
+- Project [id#11L AS _id#14L]
+- Range (0, 2, step=1, splits=Some(8))
== Optimized Logical Plan ==
Project [id#11L AS _id#14L]
+- Filter (id#11L = 0)
+- Range (0, 2, step=1, splits=Some(8))
== Physical Plan ==
*Project [id#11L AS _id#14L]
+- *Filter (id#11L = 0)
+- *Range (0, 2, step=1, splits=Some(8))
PushDownPredicate — Predicate Pushdown / Filter Pushdown Logical Optimization
PushDownPredicate
is a base logical optimization that removes (eliminates) View logical operators from a logical query plan.
PushDownPredicate
is part of the Operator Optimization before Inferring Filters fixed-point batch in the standard batches of the Catalyst Optimizer.
PushDownPredicate
is simply a Catalyst rule for transforming logical plans, i.e. Rule[LogicalPlan]
.
When you execute where or filter operators right after loading a dataset, Spark SQL will try to push the where/filter predicate down to the data source using a corresponding SQL query with WHERE
clause (or whatever the proper language for the data source is).
This optimization is called filter pushdown or predicate pushdown and aims at pushing down the filtering to the "bare metal", i.e. a data source engine. That is to increase the performance of queries since the filtering is performed at the very low level rather than dealing with the entire dataset after it has been loaded to Spark’s memory and perhaps causing memory issues.
PushDownPredicate
is also applied to structured queries with filters after projections or filtering on window partitions.
Pushing Filter Operator Down Using Projection
Optimizing Window Aggregate Operators
val dataset = spark.range(5).withColumn("group", 'id % 3)
scala> dataset.show
+---+-----+
| id|group|
+---+-----+
| 0| 0|
| 1| 1|
| 2| 2|
| 3| 0|
| 4| 1|
+---+-----+
import org.apache.spark.sql.expressions.Window
val groupW = Window.partitionBy('group).orderBy('id)
// Filter out group 2 after window
// No need to compute rank for group 2
// Push the filter down
val ranked = dataset.withColumn("rank", rank over groupW).filter('group !== 2)
scala> ranked.queryExecution.optimizedPlan
...
TRACE SparkOptimizer:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicate ===
!Filter NOT (group#35L = cast(2 as bigint)) Project [id#32L, group#35L, rank#203]
!+- Project [id#32L, group#35L, rank#203] +- Project [id#32L, group#35L, rank#203, rank#203]
! +- Project [id#32L, group#35L, rank#203, rank#203] +- Window [rank(id#32L) windowspecdefinition(group#35L, id#32L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#203], [group#35L], [id#32L ASC]
! +- Window [rank(id#32L) windowspecdefinition(group#35L, id#32L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#203], [group#35L], [id#32L ASC] +- Project [id#32L, group#35L]
! +- Project [id#32L, group#35L] +- Project [id#32L, (id#32L % cast(3 as bigint)) AS group#35L]
! +- Project [id#32L, (id#32L % cast(3 as bigint)) AS group#35L] +- Filter NOT ((id#32L % cast(3 as bigint)) = cast(2 as bigint))
+- Range (0, 5, step=1, splits=Some(8)) +- Range (0, 5, step=1, splits=Some(8))
...
res1: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Window [rank(id#32L) windowspecdefinition(group#35L, id#32L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#203], [group#35L], [id#32L ASC]
+- Project [id#32L, (id#32L % 3) AS group#35L]
+- Filter NOT ((id#32L % 3) = 2)
+- Range (0, 5, step=1, splits=Some(8))
JDBC Data Source
Tip
|
Follow the instructions on how to set up PostgreSQL in Creating DataFrames from Tables using JDBC and PostgreSQL. |
Given the following code:
// Start with the PostgreSQL driver on CLASSPATH
case class Project(id: Long, name: String, website: String)
// No optimizations for typed queries
// LOG: execute <unnamed>: SELECT "id","name","website" FROM projects
val df = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:sparkdb")
.option("dbtable", "projects")
.load()
.as[Project]
.filter(_.name.contains("Spark"))
// Only the following would end up with the pushdown
val df = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:sparkdb")
.option("dbtable", "projects")
.load()
.where("""name like "%Spark%"""")
PushDownPredicate
translates the above query to the following SQL query:
LOG: execute <unnamed>: SELECT "id","name","website" FROM projects WHERE (name LIKE '%Spark%')
Tip
|
Enable
Add |
Parquet Data Source
val spark: SparkSession = ...
import spark.implicits._
// paste it to REPL individually to make the following line work
case class City(id: Long, name: String)
import org.apache.spark.sql.SaveMode.Overwrite
Seq(
City(0, "Warsaw"),
City(1, "Toronto"),
City(2, "London"),
City(3, "Redmond"),
City(4, "Boston")).toDF.write.mode(Overwrite).parquet("cities.parquet")
val cities = spark.read.parquet("cities.parquet").as[City]
// Using DataFrame's Column-based query
scala> cities.where('name === "Warsaw").queryExecution.executedPlan
res21: org.apache.spark.sql.execution.SparkPlan =
*Project [id#128L, name#129]
+- *Filter (isnotnull(name#129) && (name#129 = Warsaw))
+- *FileScan parquet [id#128L,name#129] Batched: true, Format: ParquetFormat, InputPaths: file:/Users/jacek/dev/oss/spark/cities.parquet, PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,Warsaw)], ReadSchema: struct<id:bigint,name:string>
// Using SQL query
scala> cities.where("""name = "Warsaw"""").queryExecution.executedPlan
res23: org.apache.spark.sql.execution.SparkPlan =
*Project [id#128L, name#129]
+- *Filter (isnotnull(name#129) && (name#129 = Warsaw))
+- *FileScan parquet [id#128L,name#129] Batched: true, Format: ParquetFormat, InputPaths: file:/Users/jacek/dev/oss/spark/cities.parquet, PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,Warsaw)], ReadSchema: struct<id:bigint,name:string>
// Using Dataset's strongly type-safe filter
// Why does the following not push the filter down?
scala> cities.filter(_.name == "Warsaw").queryExecution.executedPlan
res24: org.apache.spark.sql.execution.SparkPlan =
*Filter <function1>.apply
+- *FileScan parquet [id#128L,name#129] Batched: true, Format: ParquetFormat, InputPaths: file:/Users/jacek/dev/oss/spark/cities.parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,name:string>
Executing Rule — apply
Method
apply(plan: LogicalPlan): LogicalPlan
Note
|
apply is part of the Rule Contract to execute (apply) a rule on a TreeNode (e.g. LogicalPlan).
|
apply
…FIXME