FilterExec Unary Physical Operator
FilterExec
is a unary physical operator (i.e. with one child physical operator) that represents Filter and TypedFilter unary logical operators at execution.
FilterExec
supports Java code generation (aka codegen) as follows:
-
usedInputs is an empty
AttributeSet
(to defer evaluation of attribute expressions until they are actually used, i.e. in the generated Java source code for consume path) -
Uses whatever the child physical operator uses for the input RDDs
-
Generates a Java source code for the produce and consume paths in whole-stage code generation
FilterExec
is created when:
-
BasicOperators
execution planning strategy is executed (and plans Filter and TypedFilter unary logical operators) -
HiveTableScans execution planning strategy is executed (and plans HiveTableRelation leaf logical operators and requests
SparkPlanner
to pruneFilterProject) -
InMemoryScans
execution planning strategy is executed (and plans InMemoryRelation leaf logical operators and requestsSparkPlanner
to pruneFilterProject) -
DataSourceStrategy
execution planning strategy is requested to create a RowDataSourceScanExec physical operator (possibly under FilterExec and ProjectExec operators) -
FileSourceStrategy
execution planning strategy is executed (on LogicalRelations with a HadoopFsRelation) -
ExtractPythonUDFs
physical query optimization is requested to trySplitFilter
Key | Name (in web UI) | Description |
---|---|---|
|
number of output rows |
FilterExec
uses whatever the child physical operator uses for the input RDDs, the outputOrdering and the outputPartitioning.
FilterExec
uses the PredicateHelper for…FIXME
Name | Description |
---|---|
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
Creating FilterExec Instance
FilterExec
takes the following when created:
-
Catalyst expression for the filter condition
-
Child physical operator
FilterExec
initializes the internal registries and counters.
isNullIntolerant
Internal Method
isNullIntolerant(expr: Expression): Boolean
isNullIntolerant
…FIXME
Note
|
isNullIntolerant is used when…FIXME
|
usedInputs
Method
usedInputs: AttributeSet
Note
|
usedInputs is part of CodegenSupport Contract to…FIXME.
|
usedInputs
…FIXME
output
Method
output: Seq[Attribute]
Note
|
output is part of QueryPlan Contract to…FIXME.
|
output
…FIXME
Generating Java Source Code for Produce Path in Whole-Stage Code Generation — doProduce
Method
doProduce(ctx: CodegenContext): String
Note
|
doProduce is part of CodegenSupport Contract to generate the Java source code for produce path in Whole-Stage Code Generation.
|
doProduce
…FIXME
Generating Java Source Code for Consume Path in Whole-Stage Code Generation — doConsume
Method
doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String
Note
|
doConsume is part of CodegenSupport Contract to generate the Java source code for consume path in Whole-Stage Code Generation.
|
doConsume
creates a new metric term for the numOutputRows metric.
doConsume
…FIXME
In the end, doConsume
uses consume and FIXME to generate a Java source code (as a plain text) inside a do {…} while(false);
code block.
// DEMO Write one
genPredicate
Internal Method
genPredicate(c: Expression, in: Seq[ExprCode], attrs: Seq[Attribute]): String
Note
|
genPredicate is an internal method of doConsume.
|
genPredicate
…FIXME
Executing Physical Operator (Generating RDD[InternalRow]) — doExecute
Method
doExecute(): RDD[InternalRow]
Note
|
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow] ).
|
doExecute
executes the child physical operator and creates a new MapPartitionsRDD
that does the filtering.
// DEMO Show the RDD lineage with the new MapPartitionsRDD after FilterExec
Internally, doExecute
takes the numOutputRows metric.
In the end, doExecute
requests the child physical operator to execute (that triggers physical query planning and generates an RDD[InternalRow]
) and transforms it by executing the following function on internal rows per partition with index (using RDD.mapPartitionsWithIndexInternal
that creates another RDD):
-
Creates a partition filter as a new GenPredicate (for the filter condition expression and the output schema of the child physical operator)
-
Requests the generated partition filter
Predicate
toinitialize
(with0
partition index) -
Filters out elements from the partition iterator (
Iterator[InternalRow]
) by requesting the generated partition filterPredicate
to evaluate for everyInternalRow
-
Increments the numOutputRows metric for positive evaluations (i.e. that returned
true
)
-
Note
|
doExecute (by RDD.mapPartitionsWithIndexInternal ) adds a new MapPartitionsRDD to the RDD lineage. Use RDD.toDebugString to see the additional MapPartitionsRDD .
|