Stream Operators

You use stream operators to apply transformations to the elements received (often called records) from input streams and ultimately trigger computations using output operators.

Transformations are stateless, but Spark Streaming comes with an experimental support for stateful operators (e.g. mapWithState or updateStateByKey). It also offers windowed operators that can work across batches.

Note
You may use RDDs from other (non-streaming) data sources to build more advanced pipelines.

There are two main types of operators:

Every Discretized Stream (DStream) offers the following operators:

Note
DStream companion object offers a Scala implicit to convert DStream[(K, V)] to PairDStreamFunctions with methods on DStreams of key-value pairs, e.g. mapWithState or updateStateByKey.

Most streaming operators come with their own custom DStream to offer the service. It however very often boils down to overriding the compute method and applying corresponding RDD operator on a generated RDD.

print Operator

print(num: Int) operator prints num first elements of each RDD in the input stream.

print uses print(num: Int) with num being 10.

It is a output operator (that returns Unit).

For each batch, print operator prints the following header to the standard output (regardless of the number of elements to be printed out):

-------------------------------------------
Time: [time] ms
-------------------------------------------

Internally, it calls RDD.take(num + 1) (see take action) on each RDD in the stream to print num elements. It then prints …​ if there are more elements in the RDD (that would otherwise exceed num elements being requested to print).

foreachRDD Operators

foreachRDD(foreachFunc: RDD[T] => Unit): Unit
foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit

foreachRDD operator applies foreachFunc function to every RDD in the stream.

foreachRDD Example

val clicks: InputDStream[(String, String)] = messages
// println every single data received in clicks input stream
clicks.foreachRDD(rdd => rdd.foreach(println))

glom Operator

glom(): DStream[Array[T]]

glom operator creates a new stream in which RDDs in the source stream are RDD.glom over, i.e. it coalesces all elements in RDDs within each partition into an array.

reduce Operator

reduce(reduceFunc: (T, T) => T): DStream[T]

reduce operator creates a new stream of RDDs of a single element that is a result of applying reduceFunc to the data received.

Internally, it uses map and reduceByKey operators.

reduce Example

val clicks: InputDStream[(String, String)] = messages
type T = (String, String)
val reduceFunc: (T, T) => T = {
  case in @ ((k1, v1), (k2, v2)) =>
    println(s">>> input: $in")
    (k2, s"$v1 + $v2")
}
val reduceClicks: DStream[(String, String)] = clicks.reduce(reduceFunc)
reduceClicks.print

map Operator

map[U](mapFunc: T => U): DStream[U]

map operator creates a new stream with the source elements being mapped over using mapFunc function.

It creates MappedDStream stream that, when requested to compute a RDD, uses RDD.map operator.

map Example

val clicks: DStream[...] = ...
val mappedClicks: ... = clicks.map(...)

reduceByKey Operator

reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)]
reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)]
reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)]

transform Operators

transform(transformFunc: RDD[T] => RDD[U]): DStream[U]
transform(transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]

transform operator applies transformFunc function to the generated RDD for a batch.

It creates a TransformedDStream stream.

Note
It asserts that one and exactly one RDD has been generated for a batch before calling the transformFunc.
Note
It is not allowed to return null from transformFunc or a SparkException is reported. See TransformedDStream.

transform Example

import org.apache.spark.streaming.{ StreamingContext, Seconds }
val ssc = new StreamingContext(sc, batchDuration = Seconds(5))

val rdd = sc.parallelize(0 to 9)
import org.apache.spark.streaming.dstream.ConstantInputDStream
val clicks = new ConstantInputDStream(ssc, rdd)

import org.apache.spark.rdd.RDD
val transformFunc: RDD[Int] => RDD[Int] = { inputRDD =>
  println(s">>> inputRDD: $inputRDD")

  // Use SparkSQL's DataFrame to manipulate the input records
  import spark.implicits._
  inputRDD.toDF("num").show

  inputRDD
}
clicks.transform(transformFunc).print

transformWith Operators

transformWith(other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]): DStream[V]
transformWith(other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]): DStream[V]

transformWith operators apply the transformFunc function to two generated RDD for a batch.

It creates a TransformedDStream stream.

Note
It asserts that two and exactly two RDDs have been generated for a batch before calling the transformFunc.
Note
It is not allowed to return null from transformFunc or a SparkException is reported. See TransformedDStream.

transformWith Example

import org.apache.spark.streaming.{ StreamingContext, Seconds }
val ssc = new StreamingContext(sc, batchDuration = Seconds(5))

val ns = sc.parallelize(0 to 2)
import org.apache.spark.streaming.dstream.ConstantInputDStream
val nums = new ConstantInputDStream(ssc, ns)

val ws = sc.parallelize(Seq("zero", "one", "two"))
import org.apache.spark.streaming.dstream.ConstantInputDStream
val words = new ConstantInputDStream(ssc, ws)

import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.Time
val transformFunc: (RDD[Int], RDD[String], Time) => RDD[(Int, String)] = { case (ns, ws, time) =>
  println(s">>> ns: $ns")
  println(s">>> ws: $ws")
  println(s">>> batch: $time")

  ns.zip(ws)
}
nums.transformWith(words, transformFunc).print

results matching ""

    No results matching ""