import org.apache.spark.sql.execution.datasources.DataSourceStrategy
val strategy = DataSourceStrategy(spark.sessionState.conf)
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
val plan: LogicalPlan = ???
val sparkPlan = strategy(plan).head
DataSourceStrategy Execution Planning Strategy
DataSourceStrategy is an execution planning strategy (of SparkPlanner) that plans LogicalRelation logical operators as RowDataSourceScanExec physical operators (possibly under FilterExec and ProjectExec operators).
| Logical Operator | Description |
|---|---|
LogicalRelation with a CatalystScan relation |
Uses pruneFilterProjectRaw (with the RDD conversion to RDD[InternalRow] as part of
|
LogicalRelation with PrunedFilteredScan relation |
Uses pruneFilterProject (with the RDD conversion to RDD[InternalRow] as part of Matches JDBCRelation exclusively |
LogicalRelation with a PrunedScan relation |
Uses pruneFilterProject (with the RDD conversion to RDD[InternalRow] as part of
|
LogicalRelation with a TableScan relation |
Creates a RowDataSourceScanExec directly (requesting the Matches KafkaRelation exclusively |
|
Note
|
DataSourceStrategy uses PhysicalOperation Scala extractor object to destructure a logical query plan.
|
pruneFilterProject Internal Method
pruneFilterProject(
relation: LogicalRelation,
projects: Seq[NamedExpression],
filterPredicates: Seq[Expression],
scanBuilder: (Seq[Attribute], Array[Filter]) => RDD[InternalRow])
pruneFilterProject simply calls pruneFilterProjectRaw with scanBuilder ignoring the Seq[Expression] input parameter.
|
Note
|
pruneFilterProject is used when DataSourceStrategy execution planning strategy is executed (for LogicalRelation logical operators with a PrunedFilteredScan or a PrunedScan).
|
Selecting Catalyst Expressions Convertible to Data Source Filter Predicates (and Handled by BaseRelation) — selectFilters Method
selectFilters(
relation: BaseRelation,
predicates: Seq[Expression]): (Seq[Expression], Seq[Filter], Set[Filter])
selectFilters builds a map of Catalyst predicate expressions (from the input predicates) that can be translated to a data source filter predicate.
selectFilters then requests the input BaseRelation for unhandled filters (out of the convertible ones that selectFilters built the map with).
In the end, selectFilters returns a 3-element tuple with the following:
-
Inconvertible and unhandled Catalyst predicate expressions
-
All converted data source filters
-
Pushed-down data source filters (that the input
BaseRelationcan handle)
|
Note
|
selectFilters is used exclusively when DataSourceStrategy execution planning strategy is requested to create a RowDataSourceScanExec physical operator (possibly under FilterExec and ProjectExec operators) (which is when DataSourceStrategy is executed and pruneFilterProject).
|
Translating Catalyst Expression Into Data Source Filter Predicate — translateFilter Method
translateFilter(predicate: Expression): Option[Filter]
translateFilter translates a Catalyst expression into a corresponding Filter predicate if possible. If not, translateFilter returns None.
| Catalyst Expression | Filter Predicate |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Note
|
The Catalyst expressions and their corresponding data source filter predicates have the same names in most cases but belong to different Scala packages, i.e. org.apache.spark.sql.catalyst.expressions and org.apache.spark.sql.sources, respectively.
|
|
Note
|
|
RDD Conversion (Converting RDD of Rows to Catalyst RDD of InternalRows) — toCatalystRDD Internal Method
toCatalystRDD(
relation: LogicalRelation,
output: Seq[Attribute],
rdd: RDD[Row]): RDD[InternalRow]
toCatalystRDD(relation: LogicalRelation, rdd: RDD[Row]) (1)
-
Calls the former
toCatalystRDDwith the output of theLogicalRelation
toCatalystRDD branches off per the needConversion flag of the BaseRelation of the input LogicalRelation.
When enabled (true), toCatalystRDD converts the objects inside Rows to Catalyst types.
|
Note
|
needConversion flag is enabled (true) by default.
|
Otherwise, toCatalystRDD simply casts the input RDD[Row] to a RDD[InternalRow] (as a simple untyped Scala type conversion using Java’s asInstanceOf operator).
|
Note
|
toCatalystRDD is used when DataSourceStrategy execution planning strategy is executed (for all kinds of BaseRelations).
|
Creating RowDataSourceScanExec Physical Operator for LogicalRelation (Possibly Under FilterExec and ProjectExec Operators) — pruneFilterProjectRaw Internal Method
pruneFilterProjectRaw(
relation: LogicalRelation,
projects: Seq[NamedExpression],
filterPredicates: Seq[Expression],
scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter]) => RDD[InternalRow]): SparkPlan
pruneFilterProjectRaw creates a RowDataSourceScanExec leaf physical operator given a LogicalRelation leaf logical operator (possibly as a child of a FilterExec and a ProjectExec unary physical operators).
In other words, pruneFilterProjectRaw simply converts a LogicalRelation leaf logical operator into a RowDataSourceScanExec leaf physical operator (possibly under a FilterExec and a ProjectExec unary physical operators).
|
Note
|
pruneFilterProjectRaw is almost like SparkPlanner.pruneFilterProject.
|
Internally, pruneFilterProjectRaw splits the input filterPredicates expressions to select the Catalyst expressions that can be converted to data source filter predicates (and handled by the BaseRelation of the LogicalRelation).
pruneFilterProjectRaw combines all expressions that are neither convertible to data source filters nor can be handled by the relation using And binary expression (that creates a so-called filterCondition that will eventually be used to create a FilterExec physical operator if non-empty).
pruneFilterProjectRaw creates a RowDataSourceScanExec leaf physical operator.
If it is possible to use a column pruning only to get the right projection and if the columns of this projection are enough to evaluate all filter conditions, pruneFilterProjectRaw creates a FilterExec unary physical operator (with the unhandled predicate expressions and the RowDataSourceScanExec leaf physical operator as the child).
|
Note
|
In this case no extra ProjectExec unary physical operator is created. |
Otherwise, pruneFilterProjectRaw creates a FilterExec unary physical operator (with the unhandled predicate expressions and the RowDataSourceScanExec leaf physical operator as the child) that in turn becomes the child of a new ProjectExec unary physical operator.
|
Note
|
pruneFilterProjectRaw is used exclusively when DataSourceStrategy execution planning strategy is executed (for a LogicalRelation with a CatalystScan relation) and pruneFilterProject (when executed for a LogicalRelation with a PrunedFilteredScan or a PrunedScan relation).
|