FilterExec Unary Physical Operator
FilterExec is a unary physical operator (i.e. with one child physical operator) that represents Filter and TypedFilter unary logical operators at execution.
FilterExec supports Java code generation (aka codegen) as follows:
-
usedInputs is an empty
AttributeSet(to defer evaluation of attribute expressions until they are actually used, i.e. in the generated Java source code for consume path) -
Uses whatever the child physical operator uses for the input RDDs
-
Generates a Java source code for the produce and consume paths in whole-stage code generation
FilterExec is created when:
-
BasicOperatorsexecution planning strategy is executed (and plans Filter and TypedFilter unary logical operators) -
HiveTableScans execution planning strategy is executed (and plans HiveTableRelation leaf logical operators and requests
SparkPlannerto pruneFilterProject) -
InMemoryScansexecution planning strategy is executed (and plans InMemoryRelation leaf logical operators and requestsSparkPlannerto pruneFilterProject) -
DataSourceStrategyexecution planning strategy is requested to create a RowDataSourceScanExec physical operator (possibly under FilterExec and ProjectExec operators) -
FileSourceStrategyexecution planning strategy is executed (on LogicalRelations with a HadoopFsRelation) -
ExtractPythonUDFsphysical query optimization is requested to trySplitFilter
| Key | Name (in web UI) | Description |
|---|---|---|
|
number of output rows |
FilterExec uses whatever the child physical operator uses for the input RDDs, the outputOrdering and the outputPartitioning.
FilterExec uses the PredicateHelper for…FIXME
| Name | Description |
|---|---|
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
Creating FilterExec Instance
FilterExec takes the following when created:
-
Catalyst expression for the filter condition
-
Child physical operator
FilterExec initializes the internal registries and counters.
isNullIntolerant Internal Method
isNullIntolerant(expr: Expression): Boolean
isNullIntolerant…FIXME
|
Note
|
isNullIntolerant is used when…FIXME
|
usedInputs Method
usedInputs: AttributeSet
|
Note
|
usedInputs is part of CodegenSupport Contract to…FIXME.
|
usedInputs…FIXME
output Method
output: Seq[Attribute]
|
Note
|
output is part of QueryPlan Contract to…FIXME.
|
output…FIXME
Generating Java Source Code for Produce Path in Whole-Stage Code Generation — doProduce Method
doProduce(ctx: CodegenContext): String
|
Note
|
doProduce is part of CodegenSupport Contract to generate the Java source code for produce path in Whole-Stage Code Generation.
|
doProduce…FIXME
Generating Java Source Code for Consume Path in Whole-Stage Code Generation — doConsume Method
doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String
|
Note
|
doConsume is part of CodegenSupport Contract to generate the Java source code for consume path in Whole-Stage Code Generation.
|
doConsume creates a new metric term for the numOutputRows metric.
doConsume…FIXME
In the end, doConsume uses consume and FIXME to generate a Java source code (as a plain text) inside a do {…} while(false); code block.
// DEMO Write one
genPredicate Internal Method
genPredicate(c: Expression, in: Seq[ExprCode], attrs: Seq[Attribute]): String
|
Note
|
genPredicate is an internal method of doConsume.
|
genPredicate…FIXME
Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method
doExecute(): RDD[InternalRow]
|
Note
|
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).
|
doExecute executes the child physical operator and creates a new MapPartitionsRDD that does the filtering.
// DEMO Show the RDD lineage with the new MapPartitionsRDD after FilterExec
Internally, doExecute takes the numOutputRows metric.
In the end, doExecute requests the child physical operator to execute (that triggers physical query planning and generates an RDD[InternalRow]) and transforms it by executing the following function on internal rows per partition with index (using RDD.mapPartitionsWithIndexInternal that creates another RDD):
-
Creates a partition filter as a new GenPredicate (for the filter condition expression and the output schema of the child physical operator)
-
Requests the generated partition filter
Predicatetoinitialize(with0partition index) -
Filters out elements from the partition iterator (
Iterator[InternalRow]) by requesting the generated partition filterPredicateto evaluate for everyInternalRow-
Increments the numOutputRows metric for positive evaluations (i.e. that returned
true)
-
|
Note
|
doExecute (by RDD.mapPartitionsWithIndexInternal) adds a new MapPartitionsRDD to the RDD lineage. Use RDD.toDebugString to see the additional MapPartitionsRDD.
|