package pl.japila.spark.streaming
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{ Time, StreamingContext }
import org.apache.spark.streaming.dstream.InputDStream
import scala.reflect.ClassTag
class CustomInputDStream[T: ClassTag](ssc: StreamingContext, seq: Seq[T])
extends InputDStream[T](ssc) {
override def compute(validTime: Time): Option[RDD[T]] = {
Some(ssc.sparkContext.parallelize(seq))
}
override def start(): Unit = {}
override def stop(): Unit = {}
}
Input DStreams
Input DStreams in Spark Streaming are the way to ingest data from external data sources. They are represented as InputDStream
abstract class.
InputDStream Contract
InputDStream
is the abstract base class for all input DStreams. It provides two abstract methods start()
and stop()
to start and stop ingesting data, respectively.
When instantiated, an InputDStream
registers itself as an input stream (using DStreamGraph.addInputStream) and, while doing so, is told about its owning DStreamGraph.
It asks for its own unique identifier using StreamingContext.getNewInputStreamId()
.
Note
|
It is StreamingContext to maintain the identifiers and how many input streams have already been created. |
InputDStream
has a human-readable name
that is made up from a nicely-formatted part based on the class name and the unique identifier.
Tip
|
Name your custom InputDStream using the CamelCase notation with the suffix InputDStream, e.g. MyCustomInputDStream.
|
-
slideDuration
calls DStreamGraph.batchDuration. -
dependencies
method returns an empty collection.
Note
|
compute(validTime: Time): Option[RDD[T]] abstract method from DStream abstract class is not defined.
|
Custom implementations of InputDStream
can override (and actually provide!) the optional RateController. It is undefined by default.
Custom Input DStream
Here is an example of a custom input dstream that produces an RDD out of the input collection of elements (of type T
).
Note
|
It is similar to ConstantInputDStreams, but this custom implementation does not use an external RDD, but generates its own. |
Its use could be as simple as follows (compare it to the example of ConstantInputDStreams):
// sc is the SparkContext instance
import org.apache.spark.streaming.Seconds
val ssc = new StreamingContext(sc, batchDuration = Seconds(5))
// Create the collection of numbers
val nums = 0 to 9
// Create constant input dstream with the RDD
import pl.japila.spark.streaming.CustomInputDStream
val cis = new CustomInputDStream(ssc, nums)
// Sample stream computation
cis.print
Tip
|
Copy and paste it to spark-shell to run it.
|