Windowed Operators

Note

Go to Window Operations to read the official documentation.

This document aims at presenting the internals of window operators with examples.

In short, windowed operators allow you to apply transformations over a sliding window of data, i.e. build a stateful computation across multiple batches.

Note
Windowed operators, windowed operations, and window-based operations are all the same concept.

By default, you apply transformations using different stream operators to a single RDD that represents a dataset that has been built out of data received from one or many input streams. The transformations know nothing about the past (datasets received and already processed). The computations are hence stateless.

You can however build datasets based upon the past ones, and that is when windowed operators enter the stage. Using them allows you to cross the boundary of a single dataset (per batch) and have a series of datasets in your hands (as if the data they hold arrived in a single batch interval).

Table 1. Streaming Windowed Operators
Operator Description

slice

window

reduceByWindow

slice Operators

slice(interval: Interval): Seq[RDD[T]]
slice(fromTime: Time, toTime: Time): Seq[RDD[T]]

slice operators return a collection of RDDs that were generated during time interval inclusive, given as Interval or a pair of Time ends.

Both Time ends have to be a multiple of this stream’s slide duration. Otherwise, they are aligned using Time.floor method.

When used, you should see the following INFO message in the logs:

INFO Slicing from [fromTime] to [toTime] (aligned to [alignedFromTime] and [alignedToTime])

For every batch in the slicing interval, a RDD is computed.

window Operators

window(windowDuration: Duration): DStream[T]
window(windowDuration: Duration, slideDuration: Duration): DStream[T]

window operator creates a new stream that generates RDDs containing all the elements received during windowDuration with slideDuration slide duration.

Note
windowDuration must be a multiple of the slide duration of the source stream.

window(windowDuration: Duration): DStream[T] operator uses window(windowDuration: Duration, slideDuration: Duration) with the source stream’s slide duration.

messages.window(Seconds(10))

It creates WindowedDStream stream and register it as an output stream.

Note
window operator is used by reduceByWindow, reduceByKeyAndWindow and groupByKeyAndWindow operators.

reduceByWindow Operator

reduceByWindow(
  reduceFunc: (T, T) => T,
  windowDuration: Duration,
  slideDuration: Duration): DStream[T]

reduceByWindow(
  reduceFunc: (T, T) => T,
  invReduceFunc: (T, T) => T,
  windowDuration: Duration,
  slideDuration: Duration): DStream[T]

reduceByWindow creates a new stream of RDDs of one element only that was computed using reduceFunc function over the data received during batch duration that later was again applied to a collection of the reduced elements from the past being window duration windowDuration sliding slideDuration forward.

Internally, reduceByWindow is exactly reduce operator (with reduceFunc) followed by window (of windowDuration and slideDuration) that ultimately gets reduced (again) with reduceFunc.

// batchDuration = Seconds(5)

val clicks: InputDStream[(String, String)] = messages
type T = (String, String)
val reduceFn: (T, T) => T = {
  case in @ ((k1, v1), (k2, v2)) =>
    println(s">>> input: $in")
    (k2, s"$v1 + $v2")
}
val windowedClicks: DStream[(String, String)] =
  clicks.reduceByWindow(reduceFn, windowDuration = Seconds(10), slideDuration = Seconds(5))

windowedClicks.print

results matching ""

    No results matching ""