TimeWindows — Time-Bound Window Specification

TimeWindows is a window specification of time windows.

TimeWindows is described by the following properties:

  • Window duration (aka window size) (in millis)

  • Advance interval (in millis)

  • Grace period for late events

  • Maintain duration (in millis)

Maintain duration is no longer in use.

TimeWindows can be created only using the of factory method.

static TimeWindows of(final Duration size)

of is used to create a time specification of tumbling windows which are fixed-sized, gap-less, non-overlapping windows (and simply sets the sizeMs and advanceMs internal properties to the given value).

import org.apache.kafka.streams.kstream.TimeWindows
import java.time.Duration
val timeWindows = TimeWindows.of(Duration.ofMinutes(1))
scala> println(timeWindows)
TimeWindows{maintainDurationMs=86400000, sizeMs=60000, advanceMs=60000, grace=null, segments=3}

TimeWindows can be further configured using the advanceBy and grace methods.

TimeWindows advanceBy(final Duration advance)

advanceBy allows for a time specification of hopping windows which are fixed-sized, overlapping windows (and simply sets the advanceMs internal property).

import org.apache.kafka.streams.kstream.TimeWindows
import java.time.Duration
val timeWindows = TimeWindows
scala> println(timeWindows)
TimeWindows{maintainDurationMs=86400000, sizeMs=60000, advanceMs=30000, grace=null, segments=3}
TimeWindows grace(final Duration afterWindowEnd)

grace specifies how long to wait for late events to be included in a time window (and simply sets the grace internal property).

import org.apache.kafka.streams.kstream.TimeWindows
import java.time.Duration
val timeWindows = TimeWindows
  .grace(Duration.ofMinutes(2)) // 1 minute late after a time window has elapsed
scala> println(timeWindows)
TimeWindows{maintainDurationMs=86400000, sizeMs=60000, advanceMs=60000, grace=PT2M, segments=3}

windowsFor Method

Map<Long, TimeWindow> windowsFor(final long timestamp)
windowsFor is part of the Windows Contract to…​FIXME.


import collection.JavaConverters._
val windows = timeWindows.windowsFor(1.minute.toMillis).asScala
scala> windows.foreach(println)
(30000,Window{start=30000, end=90000})
(60000,Window{start=60000, end=120000})

