// test datasets
scala> val ds1 = spark.range(4)
ds1: org.apache.spark.sql.Dataset[Long] = [value: bigint]
scala> val ds2 = spark.range(2)
ds2: org.apache.spark.sql.Dataset[Long] = [value: bigint]
// Case 1. Rather than `LocalLimit` of `Union` do `Union` of `LocalLimit`
scala> ds1.union(ds2).limit(2).explain(true)
== Parsed Logical Plan ==
GlobalLimit 2
+- LocalLimit 2
+- Union
:- Range (0, 4, step=1, splits=Some(8))
+- Range (0, 2, step=1, splits=Some(8))
== Analyzed Logical Plan ==
id: bigint
GlobalLimit 2
+- LocalLimit 2
+- Union
:- Range (0, 4, step=1, splits=Some(8))
+- Range (0, 2, step=1, splits=Some(8))
== Optimized Logical Plan ==
GlobalLimit 2
+- LocalLimit 2
+- Union
:- LocalLimit 2
: +- Range (0, 4, step=1, splits=Some(8))
+- LocalLimit 2
+- Range (0, 2, step=1, splits=Some(8))
== Physical Plan ==
CollectLimit 2
+- Union
:- *LocalLimit 2
: +- *Range (0, 4, step=1, splits=Some(8))
+- *LocalLimit 2
+- *Range (0, 2, step=1, splits=Some(8))
LimitPushDown Logical Optimization
LimitPushDown
is a base logical optimization that transforms the following logical plans:
-
LocalLimit
withUnion
-
LocalLimit
with Join
LimitPushDown
is part of the Operator Optimization before Inferring Filters fixed-point batch in the standard batches of the Catalyst Optimizer.
LimitPushDown
is simply a Catalyst rule for transforming logical plans, i.e. Rule[LogicalPlan]
.
Executing Rule — apply
Method
apply(plan: LogicalPlan): LogicalPlan
Note
|
apply is part of the Rule Contract to execute (apply) a rule on a TreeNode (e.g. LogicalPlan).
|
apply
…FIXME
Creating LimitPushDown Instance
LimitPushDown
takes the following when created:
LimitPushDown
initializes the internal registries and counters.
Note
|
LimitPushDown is created when
|