slice(interval: Interval): Seq[RDD[T]]
slice(fromTime: Time, toTime: Time): Seq[RDD[T]]
Windowed Operators
Note
|
Go to Window Operations to read the official documentation. This document aims at presenting the internals of window operators with examples. |
In short, windowed operators allow you to apply transformations over a sliding window of data, i.e. build a stateful computation across multiple batches.
Note
|
Windowed operators, windowed operations, and window-based operations are all the same concept. |
By default, you apply transformations using different stream operators to a single RDD that represents a dataset that has been built out of data received from one or many input streams. The transformations know nothing about the past (datasets received and already processed). The computations are hence stateless.
You can however build datasets based upon the past ones, and that is when windowed operators enter the stage. Using them allows you to cross the boundary of a single dataset (per batch) and have a series of datasets in your hands (as if the data they hold arrived in a single batch interval).
Operator | Description |
---|---|
slice
Operators
slice
operators return a collection of RDDs that were generated during time interval inclusive, given as Interval
or a pair of Time
ends.
Both Time
ends have to be a multiple of this stream’s slide duration. Otherwise, they are aligned using Time.floor
method.
When used, you should see the following INFO message in the logs:
INFO Slicing from [fromTime] to [toTime] (aligned to [alignedFromTime] and [alignedToTime])
For every batch in the slicing interval, a RDD is computed.
window
Operators
window(windowDuration: Duration): DStream[T]
window(windowDuration: Duration, slideDuration: Duration): DStream[T]
window
operator creates a new stream that generates RDDs containing all the elements received during windowDuration
with slideDuration
slide duration.
Note
|
windowDuration must be a multiple of the slide duration of the source stream.
|
window(windowDuration: Duration): DStream[T]
operator uses window(windowDuration: Duration, slideDuration: Duration)
with the source stream’s slide duration.
messages.window(Seconds(10))
It creates WindowedDStream stream and register it as an output stream.
Note
|
window operator is used by reduceByWindow , reduceByKeyAndWindow and groupByKeyAndWindow operators.
|
reduceByWindow
Operator
reduceByWindow(
reduceFunc: (T, T) => T,
windowDuration: Duration,
slideDuration: Duration): DStream[T]
reduceByWindow(
reduceFunc: (T, T) => T,
invReduceFunc: (T, T) => T,
windowDuration: Duration,
slideDuration: Duration): DStream[T]
reduceByWindow
creates a new stream of RDDs of one element only that was computed using reduceFunc
function over the data received during batch duration that later was again applied to a collection of the reduced elements from the past being window duration windowDuration
sliding slideDuration
forward.
Internally, reduceByWindow
is exactly reduce operator (with reduceFunc
) followed by window (of windowDuration
and slideDuration
) that ultimately gets reduce
d (again) with reduceFunc
.
// batchDuration = Seconds(5)
val clicks: InputDStream[(String, String)] = messages
type T = (String, String)
val reduceFn: (T, T) => T = {
case in @ ((k1, v1), (k2, v2)) =>
println(s">>> input: $in")
(k2, s"$v1 + $v2")
}
val windowedClicks: DStream[(String, String)] =
clicks.reduceByWindow(reduceFn, windowDuration = Seconds(10), slideDuration = Seconds(5))
windowedClicks.print