// test datasets
scala> val ds1 = spark.range(4)
ds1: org.apache.spark.sql.Dataset[Long] = [value: bigint]
scala> val ds2 = spark.range(2)
ds2: org.apache.spark.sql.Dataset[Long] = [value: bigint]
// Case 1. Rather than `LocalLimit` of `Union` do `Union` of `LocalLimit`
scala> ds1.union(ds2).limit(2).explain(true)
== Parsed Logical Plan ==
GlobalLimit 2
+- LocalLimit 2
+- Union
:- Range (0, 4, step=1, splits=Some(8))
+- Range (0, 2, step=1, splits=Some(8))
== Analyzed Logical Plan ==
id: bigint
GlobalLimit 2
+- LocalLimit 2
+- Union
:- Range (0, 4, step=1, splits=Some(8))
+- Range (0, 2, step=1, splits=Some(8))
== Optimized Logical Plan ==
GlobalLimit 2
+- LocalLimit 2
+- Union
:- LocalLimit 2
: +- Range (0, 4, step=1, splits=Some(8))
+- LocalLimit 2
+- Range (0, 2, step=1, splits=Some(8))
== Physical Plan ==
CollectLimit 2
+- Union
:- *LocalLimit 2
: +- *Range (0, 4, step=1, splits=Some(8))
+- *LocalLimit 2
+- *Range (0, 2, step=1, splits=Some(8))
LimitPushDown Logical Optimization
LimitPushDown is a base logical optimization that transforms the following logical plans:
-
LocalLimitwithUnion -
LocalLimitwith Join
LimitPushDown is part of the Operator Optimization before Inferring Filters fixed-point batch in the standard batches of the Catalyst Optimizer.
LimitPushDown is simply a Catalyst rule for transforming logical plans, i.e. Rule[LogicalPlan].
Executing Rule — apply Method
apply(plan: LogicalPlan): LogicalPlan
|
Note
|
apply is part of the Rule Contract to execute (apply) a rule on a TreeNode (e.g. LogicalPlan).
|
apply…FIXME
Creating LimitPushDown Instance
LimitPushDown takes the following when created:
LimitPushDown initializes the internal registries and counters.
|
Note
|
LimitPushDown is created when
|