import org.apache.spark.sql.execution.datasources.DataSourceStrategy
val strategy = DataSourceStrategy(spark.sessionState.conf)
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
val plan: LogicalPlan = ???
val sparkPlan = strategy(plan).head
DataSourceStrategy Execution Planning Strategy
DataSourceStrategy
is an execution planning strategy (of SparkPlanner) that plans LogicalRelation logical operators as RowDataSourceScanExec physical operators (possibly under FilterExec
and ProjectExec
operators).
Logical Operator | Description |
---|---|
LogicalRelation with a CatalystScan relation |
Uses pruneFilterProjectRaw (with the RDD conversion to RDD[InternalRow] as part of
|
LogicalRelation with PrunedFilteredScan relation |
Uses pruneFilterProject (with the RDD conversion to RDD[InternalRow] as part of Matches JDBCRelation exclusively |
LogicalRelation with a PrunedScan relation |
Uses pruneFilterProject (with the RDD conversion to RDD[InternalRow] as part of
|
LogicalRelation with a TableScan relation |
Creates a RowDataSourceScanExec directly (requesting the Matches KafkaRelation exclusively |
Note
|
DataSourceStrategy uses PhysicalOperation Scala extractor object to destructure a logical query plan.
|
pruneFilterProject
Internal Method
pruneFilterProject(
relation: LogicalRelation,
projects: Seq[NamedExpression],
filterPredicates: Seq[Expression],
scanBuilder: (Seq[Attribute], Array[Filter]) => RDD[InternalRow])
pruneFilterProject
simply calls pruneFilterProjectRaw with scanBuilder
ignoring the Seq[Expression]
input parameter.
Note
|
pruneFilterProject is used when DataSourceStrategy execution planning strategy is executed (for LogicalRelation logical operators with a PrunedFilteredScan or a PrunedScan).
|
Selecting Catalyst Expressions Convertible to Data Source Filter Predicates (and Handled by BaseRelation) — selectFilters
Method
selectFilters(
relation: BaseRelation,
predicates: Seq[Expression]): (Seq[Expression], Seq[Filter], Set[Filter])
selectFilters
builds a map of Catalyst predicate expressions (from the input predicates
) that can be translated to a data source filter predicate.
selectFilters
then requests the input BaseRelation
for unhandled filters (out of the convertible ones that selectFilters
built the map with).
In the end, selectFilters
returns a 3-element tuple with the following:
-
Inconvertible and unhandled Catalyst predicate expressions
-
All converted data source filters
-
Pushed-down data source filters (that the input
BaseRelation
can handle)
Note
|
selectFilters is used exclusively when DataSourceStrategy execution planning strategy is requested to create a RowDataSourceScanExec physical operator (possibly under FilterExec and ProjectExec operators) (which is when DataSourceStrategy is executed and pruneFilterProject).
|
Translating Catalyst Expression Into Data Source Filter Predicate — translateFilter
Method
translateFilter(predicate: Expression): Option[Filter]
translateFilter
translates a Catalyst expression into a corresponding Filter predicate if possible. If not, translateFilter
returns None
.
Catalyst Expression | Filter Predicate |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Note
|
The Catalyst expressions and their corresponding data source filter predicates have the same names in most cases but belong to different Scala packages, i.e. org.apache.spark.sql.catalyst.expressions and org.apache.spark.sql.sources , respectively.
|
Note
|
|
RDD Conversion (Converting RDD of Rows to Catalyst RDD of InternalRows) — toCatalystRDD
Internal Method
toCatalystRDD(
relation: LogicalRelation,
output: Seq[Attribute],
rdd: RDD[Row]): RDD[InternalRow]
toCatalystRDD(relation: LogicalRelation, rdd: RDD[Row]) (1)
-
Calls the former
toCatalystRDD
with the output of theLogicalRelation
toCatalystRDD
branches off per the needConversion flag of the BaseRelation of the input LogicalRelation.
When enabled (true
), toCatalystRDD
converts the objects inside Rows to Catalyst types.
Note
|
needConversion flag is enabled (true ) by default.
|
Otherwise, toCatalystRDD
simply casts the input RDD[Row]
to a RDD[InternalRow]
(as a simple untyped Scala type conversion using Java’s asInstanceOf
operator).
Note
|
toCatalystRDD is used when DataSourceStrategy execution planning strategy is executed (for all kinds of BaseRelations).
|
Creating RowDataSourceScanExec Physical Operator for LogicalRelation (Possibly Under FilterExec and ProjectExec Operators) — pruneFilterProjectRaw
Internal Method
pruneFilterProjectRaw(
relation: LogicalRelation,
projects: Seq[NamedExpression],
filterPredicates: Seq[Expression],
scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter]) => RDD[InternalRow]): SparkPlan
pruneFilterProjectRaw
creates a RowDataSourceScanExec leaf physical operator given a LogicalRelation leaf logical operator (possibly as a child of a FilterExec and a ProjectExec unary physical operators).
In other words, pruneFilterProjectRaw
simply converts a LogicalRelation leaf logical operator into a RowDataSourceScanExec leaf physical operator (possibly under a FilterExec and a ProjectExec unary physical operators).
Note
|
pruneFilterProjectRaw is almost like SparkPlanner.pruneFilterProject.
|
Internally, pruneFilterProjectRaw
splits the input filterPredicates
expressions to select the Catalyst expressions that can be converted to data source filter predicates (and handled by the BaseRelation of the LogicalRelation
).
pruneFilterProjectRaw
combines all expressions that are neither convertible to data source filters nor can be handled by the relation using And
binary expression (that creates a so-called filterCondition
that will eventually be used to create a FilterExec physical operator if non-empty).
pruneFilterProjectRaw
creates a RowDataSourceScanExec leaf physical operator.
If it is possible to use a column pruning only to get the right projection and if the columns of this projection are enough to evaluate all filter conditions, pruneFilterProjectRaw
creates a FilterExec unary physical operator (with the unhandled predicate expressions and the RowDataSourceScanExec
leaf physical operator as the child).
Note
|
In this case no extra ProjectExec unary physical operator is created. |
Otherwise, pruneFilterProjectRaw
creates a FilterExec unary physical operator (with the unhandled predicate expressions and the RowDataSourceScanExec
leaf physical operator as the child) that in turn becomes the child of a new ProjectExec unary physical operator.
Note
|
pruneFilterProjectRaw is used exclusively when DataSourceStrategy execution planning strategy is executed (for a LogicalRelation with a CatalystScan relation) and pruneFilterProject (when executed for a LogicalRelation with a PrunedFilteredScan or a PrunedScan relation).
|