FileSourceStrategy Execution Planning Strategy for LogicalRelations with HadoopFsRelation

FileSourceStrategy is an execution planning strategy that plans scans over collections of files (possibly partitioned or bucketed).

FileSourceStrategy is part of predefined strategies of the Spark Planner.

import org.apache.spark.sql.execution.datasources.FileSourceStrategy

// Enable INFO logging level to see the details of the strategy
val logger = FileSourceStrategy.getClass.getName.replace("$", "")
import org.apache.log4j.{Level, Logger}
Logger.getLogger(logger).setLevel(Level.INFO)

// Create a bucketed data source table
val tableName = "bucketed_4_id"
spark
  .range(100)
  .write
  .bucketBy(4, "id")
  .sortBy("id")
  .mode("overwrite")
  .saveAsTable(tableName)
val q = spark.table(tableName)
val plan = q.queryExecution.optimizedPlan

val executionPlan = FileSourceStrategy(plan).head

scala> println(executionPlan.numberedTreeString)
00 FileScan parquet default.bucketed_4_id[id#140L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/jacek/dev/apps/spark-2.3.0-bin-hadoop2.7/spark-warehouse/bucketed_4..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>

import org.apache.spark.sql.execution.FileSourceScanExec
val scan = executionPlan.collectFirst { case fsse: FileSourceScanExec => fsse }.get

scala> :type scan
org.apache.spark.sql.execution.FileSourceScanExec

FileSourceScanExec supports Bucket Pruning for LogicalRelations over HadoopFsRelation with the bucketing specification with the following:

  1. There is exactly one bucketing column

  2. The number of buckets is greater than 1

// Using the table created above
// There is exactly one bucketing column, i.e. id
// The number of buckets is greater than 1, i.e. 4
val tableName = "bucketed_4_id"
val q = spark.table(tableName).where($"id" isin (50, 90))
val qe = q.queryExecution
val plan = qe.optimizedPlan
scala> println(optimizedPlan.numberedTreeString)
00 Filter id#7L IN (50,90)
01 +- Relation[id#7L] parquet

import org.apache.spark.sql.execution.datasources.FileSourceStrategy

// Enable INFO logging level to see the details of the strategy
val logger = FileSourceStrategy.getClass.getName.replace("$", "")
import org.apache.log4j.{Level, Logger}
Logger.getLogger(logger).setLevel(Level.INFO)

scala> val executionPlan = FileSourceStrategy(plan).head
18/11/18 17:56:53 INFO FileSourceStrategy: Pruning directories with:
18/11/18 17:56:53 INFO FileSourceStrategy: Pruned 2 out of 4 buckets.
18/11/18 17:56:53 INFO FileSourceStrategy: Post-Scan Filters: id#7L IN (50,90)
18/11/18 17:56:53 INFO FileSourceStrategy: Output Data Schema: struct<id: bigint>
18/11/18 17:56:53 INFO FileSourceScanExec: Pushed Filters: In(id, [50,90])
executionPlan: org.apache.spark.sql.execution.SparkPlan = ...

scala> println(executionPlan.numberedTreeString)
00 Filter id#7L IN (50,90)
01 +- FileScan parquet default.bucketed_4_id[id#7L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id], PartitionFilters: [], PushedFilters: [In(id, [50,90])], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 2 out of 4
Tip

Enable INFO logging level for org.apache.spark.sql.execution.datasources.FileSourceStrategy logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.datasources.FileSourceStrategy=INFO

Refer to Logging.

collectProjectsAndFilters Method

collectProjectsAndFilters(plan: LogicalPlan):
  (Option[Seq[NamedExpression]], Seq[Expression], LogicalPlan, Map[Attribute, Expression])

collectProjectsAndFilters is a pattern used to destructure a LogicalPlan that can be Project or Filter. Any other LogicalPlan give an all-empty response.

Applying FileSourceStrategy Strategy to Logical Plan (Executing FileSourceStrategy) — apply Method

apply(plan: LogicalPlan): Seq[SparkPlan]
Note
apply is part of GenericStrategy Contract to generate a collection of SparkPlans for a given logical plan.

apply destructures the input logical plan into a tuple of projection and filter expressions together with a leaf logical operator.

apply only works with logical plans that are actually a LogicalRelation with a HadoopFsRelation (possibly as a child of Project and Filter logical operators).

apply computes partitionKeyFilters expression set with the filter expressions that are a subset of the partitionSchema of the HadoopFsRelation.

apply prints out the following INFO message to the logs:

Pruning directories with: [partitionKeyFilters]

apply computes afterScanFilters predicate expressions that should be evaluated after the scan.

apply prints out the following INFO message to the logs:

Post-Scan Filters: [afterScanFilters]

apply computes readDataColumns attributes that are the required attributes except the partition columns.

apply prints out the following INFO message to the logs:

Output Data Schema: [outputSchema]

apply creates a FileSourceScanExec physical operator.

If there are any afterScanFilter predicate expressions, apply creates a FilterExec physical operator with them and the FileSourceScanExec operator.

If the output of the FilterExec physical operator is different from the projects expressions, apply creates a ProjectExec physical operator with them and the FilterExec or the FileSourceScanExec operators.

results matching ""

    No results matching ""