window Function — Stream Time Windows

window is a standard function that generates tumbling, sliding or delayed stream time window ranges (on a timestamp column).

window(
  timeColumn: Column,
  windowDuration: String): Column  (1)
window(
  timeColumn: Column,
  windowDuration: String,
  slideDuration: String): Column   (2)
window(
  timeColumn: Column,
  windowDuration: String,
  slideDuration: String,
  startTime: String): Column       (3)
  1. Creates a tumbling time window with slideDuration as windowDuration and 0 second for startTime

  2. Creates a sliding time window with 0 second for startTime

  3. Creates a delayed time window

Note

Tumbling windows are a series of fixed-sized, non-overlapping and contiguous time intervals.

Note

Tumbling windows group elements of a stream into finite sets where each set corresponds to an interval.

Tumbling windows discretize a stream into non-overlapping windows.

scala> val timeColumn = window($"time", "5 seconds")
timeColumn: org.apache.spark.sql.Column = timewindow(time, 5000000, 5000000, 0) AS `window`

timeColumn should be of TimestampType, i.e. with java.sql.Timestamp values.

Tip
Use java.sql.Timestamp.from or java.sql.Timestamp.valueOf factory methods to create Timestamp instances.
// https://docs.oracle.com/javase/8/docs/api/java/time/LocalDateTime.html
import java.time.LocalDateTime
// https://docs.oracle.com/javase/8/docs/api/java/sql/Timestamp.html
import java.sql.Timestamp
val levels = Seq(
  // (year, month, dayOfMonth, hour, minute, second)
  ((2012, 12, 12, 12, 12, 12), 5),
  ((2012, 12, 12, 12, 12, 14), 9),
  ((2012, 12, 12, 13, 13, 14), 4),
  ((2016, 8,  13, 0, 0, 0), 10),
  ((2017, 5,  27, 0, 0, 0), 15)).
  map { case ((yy, mm, dd, h, m, s), a) => (LocalDateTime.of(yy, mm, dd, h, m, s), a) }.
  map { case (ts, a) => (Timestamp.valueOf(ts), a) }.
  toDF("time", "level")
scala> levels.show
+-------------------+-----+
|               time|level|
+-------------------+-----+
|2012-12-12 12:12:12|    5|
|2012-12-12 12:12:14|    9|
|2012-12-12 13:13:14|    4|
|2016-08-13 00:00:00|   10|
|2017-05-27 00:00:00|   15|
+-------------------+-----+

val q = levels.select(window($"time", "5 seconds"), $"level")
scala> q.show(truncate = false)
+---------------------------------------------+-----+
|window                                       |level|
+---------------------------------------------+-----+
|[2012-12-12 12:12:10.0,2012-12-12 12:12:15.0]|5    |
|[2012-12-12 12:12:10.0,2012-12-12 12:12:15.0]|9    |
|[2012-12-12 13:13:10.0,2012-12-12 13:13:15.0]|4    |
|[2016-08-13 00:00:00.0,2016-08-13 00:00:05.0]|10   |
|[2017-05-27 00:00:00.0,2017-05-27 00:00:05.0]|15   |
+---------------------------------------------+-----+

scala> q.printSchema
root
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- level: integer (nullable = false)

// calculating the sum of levels every 5 seconds
val sums = levels.
  groupBy(window($"time", "5 seconds")).
  agg(sum("level") as "level_sum").
  select("window.start", "window.end", "level_sum")
scala> sums.show
+-------------------+-------------------+---------+
|              start|                end|level_sum|
+-------------------+-------------------+---------+
|2012-12-12 13:13:10|2012-12-12 13:13:15|        4|
|2012-12-12 12:12:10|2012-12-12 12:12:15|       14|
|2016-08-13 00:00:00|2016-08-13 00:00:05|       10|
|2017-05-27 00:00:00|2017-05-27 00:00:05|       15|
+-------------------+-------------------+---------+

windowDuration and slideDuration are strings specifying the width of the window for duration and sliding identifiers, respectively.

Tip
Use CalendarInterval for valid window identifiers.

There are a couple of rules governing the durations:

  1. The window duration must be greater than 0

  2. The slide duration must be greater than 0.

  3. The start time must be greater than or equal to 0.

  4. The slide duration must be less than or equal to the window duration.

  5. The start time must be less than the slide duration.

Note
Only one window expression is supported in a query.
Note
null values are filtered out in window expression.

Internally, window creates a Column with TimeWindow Catalyst expression under window alias.

scala> val timeColumn = window($"time", "5 seconds")
timeColumn: org.apache.spark.sql.Column = timewindow(time, 5000000, 5000000, 0) AS `window`

val windowExpr = timeColumn.expr
scala> println(windowExpr.numberedTreeString)
00 timewindow('time, 5000000, 5000000, 0) AS window#23
01 +- timewindow('time, 5000000, 5000000, 0)
02    +- 'time

Internally, TimeWindow Catalyst expression is simply a struct type with two fields, i.e. start and end, both of TimestampType type.

scala> println(windowExpr.dataType)
StructType(StructField(start,TimestampType,true), StructField(end,TimestampType,true))

scala> println(windowExpr.dataType.prettyJson)
{
  "type" : "struct",
  "fields" : [ {
    "name" : "start",
    "type" : "timestamp",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "end",
    "type" : "timestamp",
    "nullable" : true,
    "metadata" : { }
  } ]
}
Note

TimeWindow time window Catalyst expression is planned (i.e. converted) in TimeWindowing logical optimization rule (i.e. Rule[LogicalPlan]) of the Spark SQL logical query plan analyzer.

Find more about the Spark SQL logical query plan analyzer in Mastering Apache Spark 2 gitbook.

Example — Traffic Sensor

Note
The example is borrowed from Introducing Stream Windows in Apache Flink.

The example shows how to use window function to model a traffic sensor that counts every 15 seconds the number of vehicles passing a certain location.

results matching ""

    No results matching ""