DataSourceStrategy Execution Planning Strategy

DataSourceStrategy is an execution planning strategy (of SparkPlanner) that plans LogicalRelation logical operators as RowDataSourceScanExec physical operators (possibly under FilterExec and ProjectExec operators).

Table 1. DataSourceStrategy’s Selection Requirements (in execution order)
Logical Operator Description

LogicalRelation with a CatalystScan relation

Uses pruneFilterProjectRaw (with the RDD conversion to RDD[InternalRow] as part of scanBuilder).

CatalystScan does not seem to be used in Spark SQL.

LogicalRelation with PrunedFilteredScan relation

Uses pruneFilterProject (with the RDD conversion to RDD[InternalRow] as part of scanBuilder).

Matches JDBCRelation exclusively

LogicalRelation with a PrunedScan relation

Uses pruneFilterProject (with the RDD conversion to RDD[InternalRow] as part of scanBuilder).

PrunedScan does not seem to be used in Spark SQL.

LogicalRelation with a TableScan relation

Creates a RowDataSourceScanExec directly (requesting the TableScan to buildScan followed by RDD conversion to RDD[InternalRow])

Matches KafkaRelation exclusively

import org.apache.spark.sql.execution.datasources.DataSourceStrategy
val strategy = DataSourceStrategy(spark.sessionState.conf)

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
val plan: LogicalPlan = ???

val sparkPlan = strategy(plan).head
Note
DataSourceStrategy uses PhysicalOperation Scala extractor object to destructure a logical query plan.

pruneFilterProject Internal Method

pruneFilterProject(
  relation: LogicalRelation,
  projects: Seq[NamedExpression],
  filterPredicates: Seq[Expression],
  scanBuilder: (Seq[Attribute], Array[Filter]) => RDD[InternalRow])

pruneFilterProject simply calls pruneFilterProjectRaw with scanBuilder ignoring the Seq[Expression] input parameter.

Note
pruneFilterProject is used when DataSourceStrategy execution planning strategy is executed (for LogicalRelation logical operators with a PrunedFilteredScan or a PrunedScan).

Selecting Catalyst Expressions Convertible to Data Source Filter Predicates (and Handled by BaseRelation) — selectFilters Method

selectFilters(
  relation: BaseRelation,
  predicates: Seq[Expression]): (Seq[Expression], Seq[Filter], Set[Filter])

selectFilters builds a map of Catalyst predicate expressions (from the input predicates) that can be translated to a data source filter predicate.

selectFilters then requests the input BaseRelation for unhandled filters (out of the convertible ones that selectFilters built the map with).

In the end, selectFilters returns a 3-element tuple with the following:

  1. Inconvertible and unhandled Catalyst predicate expressions

  2. All converted data source filters

  3. Pushed-down data source filters (that the input BaseRelation can handle)

Note
selectFilters is used exclusively when DataSourceStrategy execution planning strategy is requested to create a RowDataSourceScanExec physical operator (possibly under FilterExec and ProjectExec operators) (which is when DataSourceStrategy is executed and pruneFilterProject).

Translating Catalyst Expression Into Data Source Filter Predicate — translateFilter Method

translateFilter(predicate: Expression): Option[Filter]

translateFilter translates a Catalyst expression into a corresponding Filter predicate if possible. If not, translateFilter returns None.

Table 2. translateFilter’s Conversions
Catalyst Expression Filter Predicate

EqualTo

EqualTo

EqualNullSafe

EqualNullSafe

GreaterThan

GreaterThan

LessThan

LessThan

GreaterThanOrEqual

GreaterThanOrEqual

LessThanOrEqual

LessThanOrEqual

InSet

In

In

In

IsNull

IsNull

IsNotNull

IsNotNull

And

And

Or

Or

Not

Not

StartsWith

StringStartsWith

EndsWith

StringEndsWith

Contains

StringContains

Note
The Catalyst expressions and their corresponding data source filter predicates have the same names in most cases but belong to different Scala packages, i.e. org.apache.spark.sql.catalyst.expressions and org.apache.spark.sql.sources, respectively.
Note

translateFilter is used when:

RDD Conversion (Converting RDD of Rows to Catalyst RDD of InternalRows) — toCatalystRDD Internal Method

toCatalystRDD(
  relation: LogicalRelation,
  output: Seq[Attribute],
  rdd: RDD[Row]): RDD[InternalRow]
toCatalystRDD(relation: LogicalRelation, rdd: RDD[Row]) (1)
  1. Calls the former toCatalystRDD with the output of the LogicalRelation

toCatalystRDD branches off per the needConversion flag of the BaseRelation of the input LogicalRelation.

When enabled (true), toCatalystRDD converts the objects inside Rows to Catalyst types.

Note
needConversion flag is enabled (true) by default.

Otherwise, toCatalystRDD simply casts the input RDD[Row] to a RDD[InternalRow] (as a simple untyped Scala type conversion using Java’s asInstanceOf operator).

Note
toCatalystRDD is used when DataSourceStrategy execution planning strategy is executed (for all kinds of BaseRelations).

Creating RowDataSourceScanExec Physical Operator for LogicalRelation (Possibly Under FilterExec and ProjectExec Operators) — pruneFilterProjectRaw Internal Method

pruneFilterProjectRaw(
  relation: LogicalRelation,
  projects: Seq[NamedExpression],
  filterPredicates: Seq[Expression],
  scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter]) => RDD[InternalRow]): SparkPlan

pruneFilterProjectRaw creates a RowDataSourceScanExec leaf physical operator given a LogicalRelation leaf logical operator (possibly as a child of a FilterExec and a ProjectExec unary physical operators).

In other words, pruneFilterProjectRaw simply converts a LogicalRelation leaf logical operator into a RowDataSourceScanExec leaf physical operator (possibly under a FilterExec and a ProjectExec unary physical operators).

Note
pruneFilterProjectRaw is almost like SparkPlanner.pruneFilterProject.

Internally, pruneFilterProjectRaw splits the input filterPredicates expressions to select the Catalyst expressions that can be converted to data source filter predicates (and handled by the BaseRelation of the LogicalRelation).

pruneFilterProjectRaw combines all expressions that are neither convertible to data source filters nor can be handled by the relation using And binary expression (that creates a so-called filterCondition that will eventually be used to create a FilterExec physical operator if non-empty).

pruneFilterProjectRaw creates a RowDataSourceScanExec leaf physical operator.

If it is possible to use a column pruning only to get the right projection and if the columns of this projection are enough to evaluate all filter conditions, pruneFilterProjectRaw creates a FilterExec unary physical operator (with the unhandled predicate expressions and the RowDataSourceScanExec leaf physical operator as the child).

Note
In this case no extra ProjectExec unary physical operator is created.

Otherwise, pruneFilterProjectRaw creates a FilterExec unary physical operator (with the unhandled predicate expressions and the RowDataSourceScanExec leaf physical operator as the child) that in turn becomes the child of a new ProjectExec unary physical operator.

Note
pruneFilterProjectRaw is used exclusively when DataSourceStrategy execution planning strategy is executed (for a LogicalRelation with a CatalystScan relation) and pruneFilterProject (when executed for a LogicalRelation with a PrunedFilteredScan or a PrunedScan relation).

results matching ""

    No results matching ""