import org.apache.spark.sql.execution.datasources.FileSourceStrategy
// Enable INFO logging level to see the details of the strategy
val logger = FileSourceStrategy.getClass.getName.replace("$", "")
import org.apache.log4j.{Level, Logger}
Logger.getLogger(logger).setLevel(Level.INFO)
// Create a bucketed data source table
val tableName = "bucketed_4_id"
spark
.range(100)
.write
.bucketBy(4, "id")
.sortBy("id")
.mode("overwrite")
.saveAsTable(tableName)
val q = spark.table(tableName)
val plan = q.queryExecution.optimizedPlan
val executionPlan = FileSourceStrategy(plan).head
scala> println(executionPlan.numberedTreeString)
00 FileScan parquet default.bucketed_4_id[id#140L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/jacek/dev/apps/spark-2.3.0-bin-hadoop2.7/spark-warehouse/bucketed_4..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
import org.apache.spark.sql.execution.FileSourceScanExec
val scan = executionPlan.collectFirst { case fsse: FileSourceScanExec => fsse }.get
scala> :type scan
org.apache.spark.sql.execution.FileSourceScanExec
FileSourceStrategy Execution Planning Strategy for LogicalRelations with HadoopFsRelation
FileSourceStrategy
is an execution planning strategy that plans scans over collections of files (possibly partitioned or bucketed).
FileSourceStrategy
is part of predefined strategies of the Spark Planner.
FileSourceScanExec
supports Bucket Pruning for LogicalRelations over HadoopFsRelation with the bucketing specification with the following:
-
There is exactly one bucketing column
-
The number of buckets is greater than 1
// Using the table created above
// There is exactly one bucketing column, i.e. id
// The number of buckets is greater than 1, i.e. 4
val tableName = "bucketed_4_id"
val q = spark.table(tableName).where($"id" isin (50, 90))
val qe = q.queryExecution
val plan = qe.optimizedPlan
scala> println(optimizedPlan.numberedTreeString)
00 Filter id#7L IN (50,90)
01 +- Relation[id#7L] parquet
import org.apache.spark.sql.execution.datasources.FileSourceStrategy
// Enable INFO logging level to see the details of the strategy
val logger = FileSourceStrategy.getClass.getName.replace("$", "")
import org.apache.log4j.{Level, Logger}
Logger.getLogger(logger).setLevel(Level.INFO)
scala> val executionPlan = FileSourceStrategy(plan).head
18/11/18 17:56:53 INFO FileSourceStrategy: Pruning directories with:
18/11/18 17:56:53 INFO FileSourceStrategy: Pruned 2 out of 4 buckets.
18/11/18 17:56:53 INFO FileSourceStrategy: Post-Scan Filters: id#7L IN (50,90)
18/11/18 17:56:53 INFO FileSourceStrategy: Output Data Schema: struct<id: bigint>
18/11/18 17:56:53 INFO FileSourceScanExec: Pushed Filters: In(id, [50,90])
executionPlan: org.apache.spark.sql.execution.SparkPlan = ...
scala> println(executionPlan.numberedTreeString)
00 Filter id#7L IN (50,90)
01 +- FileScan parquet default.bucketed_4_id[id#7L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id], PartitionFilters: [], PushedFilters: [In(id, [50,90])], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 2 out of 4
Tip
|
Enable Add the following line to
Refer to Logging. |
collectProjectsAndFilters
Method
collectProjectsAndFilters(plan: LogicalPlan):
(Option[Seq[NamedExpression]], Seq[Expression], LogicalPlan, Map[Attribute, Expression])
collectProjectsAndFilters
is a pattern used to destructure a LogicalPlan that can be Project
or Filter
. Any other LogicalPlan
give an all-empty response.
Applying FileSourceStrategy Strategy to Logical Plan (Executing FileSourceStrategy) — apply
Method
apply(plan: LogicalPlan): Seq[SparkPlan]
Note
|
apply is part of GenericStrategy Contract to generate a collection of SparkPlans for a given logical plan.
|
apply
destructures the input logical plan into a tuple of projection and filter expressions together with a leaf logical operator.
apply
only works with logical plans that are actually a LogicalRelation with a HadoopFsRelation (possibly as a child of Project and Filter logical operators).
apply
computes partitionKeyFilters
expression set with the filter expressions that are a subset of the partitionSchema of the HadoopFsRelation
.
apply
prints out the following INFO message to the logs:
Pruning directories with: [partitionKeyFilters]
apply
computes afterScanFilters
predicate expressions that should be evaluated after the scan.
apply
prints out the following INFO message to the logs:
Post-Scan Filters: [afterScanFilters]
apply
computes readDataColumns
attributes that are the required attributes except the partition columns.
apply
prints out the following INFO message to the logs:
Output Data Schema: [outputSchema]
apply
creates a FileSourceScanExec physical operator.
If there are any afterScanFilter
predicate expressions, apply
creates a FilterExec physical operator with them and the FileSourceScanExec
operator.
If the output of the FilterExec
physical operator is different from the projects
expressions, apply
creates a ProjectExec physical operator with them and the FilterExec
or the FileSourceScanExec
operators.