import java.time.LocalDate
import java.sql.Timestamp
val sales = Seq(
(Timestamp.valueOf(LocalDate.of(2018, 9, 1).atStartOfDay), 5),
(Timestamp.valueOf(LocalDate.of(2018, 9, 2).atStartOfDay), 10),
// Mind the 2-day gap
(Timestamp.valueOf(LocalDate.of(2018, 9, 5).atStartOfDay), 5)
).toDF("time", "volume")
scala> sales.show
+-------------------+------+
| time|volume|
+-------------------+------+
|2018-09-01 00:00:00| 5|
|2018-09-02 00:00:00| 10|
|2018-09-05 00:00:00| 5|
+-------------------+------+
scala> sales.printSchema
root
|-- time: timestamp (nullable = true)
|-- volume: integer (nullable = false)
// FIXME Use Catalyst DSL
// rangeBetween with column expressions
// data type of orderBy expression is date
// data types of range frame boundaries is interval
// WindowSpecDefinition(_, Seq(order), SpecifiedWindowFrame(RangeFrame, lower, upper))
import org.apache.spark.unsafe.types.CalendarInterval
val interval = lit(CalendarInterval.fromString("interval 1 days"))
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.orderBy($"time").rangeBetween(currentRow(), interval)
val q = sales.select(
$"time",
(sum($"volume") over windowSpec) as "sum",
(count($"volume") over windowSpec) as "count")
val plan = q.queryExecution.logical
scala> println(plan.numberedTreeString)
00 'Project [unresolvedalias('time, None), sum('volume) windowspecdefinition('time ASC NULLS FIRST, specifiedwindowframe(RangeFrame, currentrow$(), interval 1 days)) AS sum#156, count('volume) windowspecdefinition('time ASC NULLS FIRST, specifiedwindowframe(RangeFrame, currentrow$(), interval 1 days)) AS count#158]
01 +- AnalysisBarrier
02 +- Project [_1#129 AS time#132, _2#130 AS volume#133]
03 +- LocalRelation [_1#129, _2#130]
import spark.sessionState.analyzer.ResolveReferences
val planWithRefsResolved = ResolveReferences(plan)
import spark.sessionState.analyzer.ResolveAliases
val planWithAliasesResolved = ResolveReferences(planWithRefsResolved)
// FIXME Looks like nothing changes in the query plan with regard to WindowFrameCoercion
import org.apache.spark.sql.catalyst.analysis.TypeCoercion.WindowFrameCoercion
val afterWindowFrameCoercion = WindowFrameCoercion(planWithRefsResolved)
scala> println(afterWindowFrameCoercion.numberedTreeString)
00 'Project [unresolvedalias(time#132, None), sum(volume#133) windowspecdefinition(time#132 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, currentrow$(), interval 1 days)) AS sum#156L, count(volume#133) windowspecdefinition(time#132 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, currentrow$(), interval 1 days)) AS count#158L]
01 +- AnalysisBarrier
02 +- Project [_1#129 AS time#132, _2#130 AS volume#133]
03 +- LocalRelation [_1#129, _2#130]
WindowFrameCoercion Type Coercion Logical Rule
WindowFrameCoercion
is a type coercion logical rule that cast the data types of the boundaries of a range window frame to the data type of the order specification in a WindowSpecDefinition in a logical plan.
import java.time.LocalDate
import java.sql.Date
val sales = Seq(
(Date.valueOf(LocalDate.of(2018, 9, 1)), 5),
(Date.valueOf(LocalDate.of(2018, 9, 2)), 10),
// Mind the 2-day gap
(Date.valueOf(LocalDate.of(2018, 9, 5)), 5)
).toDF("time", "volume")
scala> sales.show
+----------+------+
| time|volume|
+----------+------+
|2018-09-01| 5|
|2018-09-02| 10|
|2018-09-05| 5|
+----------+------+
scala> sales.printSchema
root
|-- time: date (nullable = true)
|-- volume: integer (nullable = false)
// FIXME Use Catalyst DSL
// rangeBetween with column expressions
// data type of orderBy expression is date
// WindowSpecDefinition(_, Seq(order), SpecifiedWindowFrame(RangeFrame, lower, upper))
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.orderBy($"time").rangeBetween(currentRow(), lit(1))
val q = sales.select(
$"time",
(sum($"volume") over windowSpec) as "sum")
val plan = q.queryExecution.logical
scala> println(plan.numberedTreeString)
00 'Project [unresolvedalias('time, None), sum('volume) windowspecdefinition('time ASC NULLS FIRST, specifiedwindowframe(RangeFrame, currentrow$(), 1)) AS sum#238]
01 +- AnalysisBarrier
02 +- Project [_1#222 AS time#225, _2#223 AS volume#226]
03 +- LocalRelation [_1#222, _2#223]
import spark.sessionState.analyzer.ResolveReferences
val planWithRefsResolved = ResolveReferences(plan)
import spark.sessionState.analyzer.ResolveAliases
val planWithAliasesResolved = ResolveReferences(planWithRefsResolved)
// FIXME Looks like nothing changes in the query plan with regard to WindowFrameCoercion
import org.apache.spark.sql.catalyst.analysis.TypeCoercion.WindowFrameCoercion
val afterWindowFrameCoercion = WindowFrameCoercion(planWithAliasesResolved)
scala> println(afterWindowFrameCoercion.numberedTreeString)
00 'Project [unresolvedalias(time#132, None), sum(volume#133) windowspecdefinition(time#132 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, currentrow$(), interval 1 days)) AS sum#156L, count(volume#133) windowspecdefinition(time#132 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, currentrow$(), interval 1 days)) AS count#158L]
01 +- AnalysisBarrier
02 +- Project [_1#129 AS time#132, _2#130 AS volume#133]
03 +- LocalRelation [_1#129, _2#130]
Coercing Types in Logical Plan — coerceTypes
Method
coerceTypes(plan: LogicalPlan): LogicalPlan
Note
|
coerceTypes is part of the TypeCoercionRule Contract to coerce types in a logical plan.
|
coerceTypes
traverses all Catalyst expressions (in the input LogicalPlan) and replaces the frameSpecification of every WindowSpecDefinition with a RangeFrame
window frame and the single order specification expression resolved with the lower and upper window frame boundary expressions cast to the data type of the order specification expression.
createBoundaryCast
Internal Method
createBoundaryCast(boundary: Expression, dt: DataType): Expression
createBoundaryCast
returns a Catalyst expression per the input boundary
Expression and the dt
DataType (in the order of execution):
-
The input
boundary
expression if it is aSpecialFrameBoundary
-
The input
boundary
expression if thedt
data type is DateType or TimestampType -
Cast
unary operator with the inputboundary
expression and thedt
data type if the result type of theboundary
expression is not thedt
data type, but the result type can be cast to thedt
data type -
The input
boundary
expression
Note
|
createBoundaryCast is used exclusively when WindowFrameCoercion type coercion logical rule is requested to coerceTypes.
|