Standard Functions for Date and Time

Table 1. (Subset of) Standard Functions for Date and Time
Name Description

current_date

Gives current date as a date column

current_timestamp

date_format

to_date

to_timestamp

unix_timestamp

Converts current or specified time to Unix timestamp (in seconds)

window

Generates time windows (i.e. tumbling, sliding and delayed windows)

Current Date As Date Column — current_date Function

current_date(): Column

current_date function gives the current date as a date column.

val df = spark.range(1).select(current_date)
scala> df.show
+--------------+
|current_date()|
+--------------+
|    2017-09-16|
+--------------+

scala> df.printSchema
root
 |-- current_date(): date (nullable = false)

Internally, current_date creates a Column with CurrentDate Catalyst leaf expression.

val c = current_date()
import org.apache.spark.sql.catalyst.expressions.CurrentDate
val cd = c.expr.asInstanceOf[CurrentDate]
scala> println(cd.prettyName)
current_date

scala> println(cd.numberedTreeString)
00 current_date(None)

date_format Function

date_format(dateExpr: Column, format: String): Column

Internally, date_format creates a Column with DateFormatClass binary expression. DateFormatClass takes the expression from dateExpr column and format.

val c = date_format($"date", "dd/MM/yyyy")

import org.apache.spark.sql.catalyst.expressions.DateFormatClass
val dfc = c.expr.asInstanceOf[DateFormatClass]
scala> println(dfc.prettyName)
date_format

scala> println(dfc.numberedTreeString)
00 date_format('date, dd/MM/yyyy, None)
01 :- 'date
02 +- dd/MM/yyyy

current_timestamp Function

current_timestamp(): Column
Caution
FIXME
Note
current_timestamp is also now function in SQL.

to_date Function

to_date(e: Column, fmt: String): Column
Caution
FIXME

to_timestamp Function

to_timestamp(s: Column): Column
to_timestamp(s: Column, fmt: String): Column
Caution
FIXME

Converting Current or Specified Time to Unix Timestamp — unix_timestamp Function

unix_timestamp(): Column  (1)
unix_timestamp(time: Column): Column (2)
unix_timestamp(time: Column, format: String): Column
  1. Gives current timestamp (in seconds)

  2. Converts time string in format yyyy-MM-dd HH:mm:ss to Unix timestamp (in seconds)

unix_timestamp converts the current or specified time in the specified format to a Unix timestamp (in seconds).

unix_timestamp supports a column of type Date, Timestamp or String.

// no time and format => current time
scala> spark.range(1).select(unix_timestamp as "current_timestamp").show
+-----------------+
|current_timestamp|
+-----------------+
|       1493362850|
+-----------------+

// no format so yyyy-MM-dd HH:mm:ss assumed
scala> Seq("2017-01-01 00:00:00").toDF("time").withColumn("unix_timestamp", unix_timestamp($"time")).show
+-------------------+--------------+
|               time|unix_timestamp|
+-------------------+--------------+
|2017-01-01 00:00:00|    1483225200|
+-------------------+--------------+

scala> Seq("2017/01/01 00:00:00").toDF("time").withColumn("unix_timestamp", unix_timestamp($"time", "yyyy/MM/dd")).show
+-------------------+--------------+
|               time|unix_timestamp|
+-------------------+--------------+
|2017/01/01 00:00:00|    1483225200|
+-------------------+--------------+

unix_timestamp returns null if conversion fails.

// note slashes as date separators
scala> Seq("2017/01/01 00:00:00").toDF("time").withColumn("unix_timestamp", unix_timestamp($"time")).show
+-------------------+--------------+
|               time|unix_timestamp|
+-------------------+--------------+
|2017/01/01 00:00:00|          null|
+-------------------+--------------+
Note

unix_timestamp is also supported in SQL mode.

scala> spark.sql("SELECT unix_timestamp() as unix_timestamp").show
+--------------+
|unix_timestamp|
+--------------+
|    1493369225|
+--------------+

Internally, unix_timestamp creates a Column with UnixTimestamp binary expression (possibly with CurrentTimestamp).

Generating Time Windows — window Function

window(
  timeColumn: Column,
  windowDuration: String): Column  (1)
window(
  timeColumn: Column,
  windowDuration: String,
  slideDuration: String): Column   (2)
window(
  timeColumn: Column,
  windowDuration: String,
  slideDuration: String,
  startTime: String): Column       (3)
  1. Creates a tumbling time window with slideDuration as windowDuration and 0 second for startTime

  2. Creates a sliding time window with 0 second for startTime

  3. Creates a delayed time window

window generates tumbling, sliding or delayed time windows of windowDuration duration given a timeColumn timestamp specifying column.

Note

Tumbling windows are a series of fixed-sized, non-overlapping and contiguous time intervals.

Note

Tumbling windows group elements of a stream into finite sets where each set corresponds to an interval.

Tumbling windows discretize a stream into non-overlapping windows.

scala> val timeColumn = window('time, "5 seconds")
timeColumn: org.apache.spark.sql.Column = timewindow(time, 5000000, 5000000, 0) AS `window`

timeColumn should be of TimestampType, i.e. with java.sql.Timestamp values.

Tip
Use java.sql.Timestamp.from or java.sql.Timestamp.valueOf factory methods to create Timestamp instances.
// https://docs.oracle.com/javase/8/docs/api/java/time/LocalDateTime.html
import java.time.LocalDateTime
// https://docs.oracle.com/javase/8/docs/api/java/sql/Timestamp.html
import java.sql.Timestamp
val levels = Seq(
  // (year, month, dayOfMonth, hour, minute, second)
  ((2012, 12, 12, 12, 12, 12), 5),
  ((2012, 12, 12, 12, 12, 14), 9),
  ((2012, 12, 12, 13, 13, 14), 4),
  ((2016, 8,  13, 0, 0, 0), 10),
  ((2017, 5,  27, 0, 0, 0), 15)).
  map { case ((yy, mm, dd, h, m, s), a) => (LocalDateTime.of(yy, mm, dd, h, m, s), a) }.
  map { case (ts, a) => (Timestamp.valueOf(ts), a) }.
  toDF("time", "level")
scala> levels.show
+-------------------+-----+
|               time|level|
+-------------------+-----+
|2012-12-12 12:12:12|    5|
|2012-12-12 12:12:14|    9|
|2012-12-12 13:13:14|    4|
|2016-08-13 00:00:00|   10|
|2017-05-27 00:00:00|   15|
+-------------------+-----+

val q = levels.select(window($"time", "5 seconds"), $"level")
scala> q.show(truncate = false)
+---------------------------------------------+-----+
|window                                       |level|
+---------------------------------------------+-----+
|[2012-12-12 12:12:10.0,2012-12-12 12:12:15.0]|5    |
|[2012-12-12 12:12:10.0,2012-12-12 12:12:15.0]|9    |
|[2012-12-12 13:13:10.0,2012-12-12 13:13:15.0]|4    |
|[2016-08-13 00:00:00.0,2016-08-13 00:00:05.0]|10   |
|[2017-05-27 00:00:00.0,2017-05-27 00:00:05.0]|15   |
+---------------------------------------------+-----+

scala> q.printSchema
root
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- level: integer (nullable = false)

// calculating the sum of levels every 5 seconds
val sums = levels.
  groupBy(window($"time", "5 seconds")).
  agg(sum("level") as "level_sum").
  select("window.start", "window.end", "level_sum")
scala> sums.show
+-------------------+-------------------+---------+
|              start|                end|level_sum|
+-------------------+-------------------+---------+
|2012-12-12 13:13:10|2012-12-12 13:13:15|        4|
|2012-12-12 12:12:10|2012-12-12 12:12:15|       14|
|2016-08-13 00:00:00|2016-08-13 00:00:05|       10|
|2017-05-27 00:00:00|2017-05-27 00:00:05|       15|
+-------------------+-------------------+---------+

windowDuration and slideDuration are strings specifying the width of the window for duration and sliding identifiers, respectively.

Tip
Use CalendarInterval for valid window identifiers.
Note
window is available as of Spark 2.0.0.

Internally, window creates a Column (with TimeWindow expression) available as window alias.

// q is the query defined earlier
scala> q.show(truncate = false)
+---------------------------------------------+-----+
|window                                       |level|
+---------------------------------------------+-----+
|[2012-12-12 12:12:10.0,2012-12-12 12:12:15.0]|5    |
|[2012-12-12 12:12:10.0,2012-12-12 12:12:15.0]|9    |
|[2012-12-12 13:13:10.0,2012-12-12 13:13:15.0]|4    |
|[2016-08-13 00:00:00.0,2016-08-13 00:00:05.0]|10   |
|[2017-05-27 00:00:00.0,2017-05-27 00:00:05.0]|15   |
+---------------------------------------------+-----+

scala> println(timeColumn.expr.numberedTreeString)
00 timewindow('time, 5000000, 5000000, 0) AS window#22
01 +- timewindow('time, 5000000, 5000000, 0)
02    +- 'time

Example — Traffic Sensor

Note
The example is borrowed from Introducing Stream Windows in Apache Flink.

The example shows how to use window function to model a traffic sensor that counts every 15 seconds the number of vehicles passing a certain location.

results matching ""

    No results matching ""