Input DStreams

Input DStreams in Spark Streaming are the way to ingest data from external data sources. They are represented as InputDStream abstract class.

InputDStream Contract

InputDStream is the abstract base class for all input DStreams. It provides two abstract methods start() and stop() to start and stop ingesting data, respectively.

When instantiated, an InputDStream registers itself as an input stream (using DStreamGraph.addInputStream) and, while doing so, is told about its owning DStreamGraph.

It asks for its own unique identifier using StreamingContext.getNewInputStreamId().

Note
It is StreamingContext to maintain the identifiers and how many input streams have already been created.

InputDStream has a human-readable name that is made up from a nicely-formatted part based on the class name and the unique identifier.

Tip
Name your custom InputDStream using the CamelCase notation with the suffix InputDStream, e.g. MyCustomInputDStream.
Note
compute(validTime: Time): Option[RDD[T]] abstract method from DStream abstract class is not defined.

Custom implementations of InputDStream can override (and actually provide!) the optional RateController. It is undefined by default.

Custom Input DStream

Here is an example of a custom input dstream that produces an RDD out of the input collection of elements (of type T).

Note
It is similar to ConstantInputDStreams, but this custom implementation does not use an external RDD, but generates its own.
package pl.japila.spark.streaming

import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{ Time, StreamingContext }
import org.apache.spark.streaming.dstream.InputDStream

import scala.reflect.ClassTag

class CustomInputDStream[T: ClassTag](ssc: StreamingContext, seq: Seq[T])
  extends InputDStream[T](ssc) {
  override def compute(validTime: Time): Option[RDD[T]] = {
    Some(ssc.sparkContext.parallelize(seq))
  }
  override def start(): Unit = {}
  override def stop(): Unit = {}
}

Its use could be as simple as follows (compare it to the example of ConstantInputDStreams):

// sc is the SparkContext instance
import org.apache.spark.streaming.Seconds
val ssc = new StreamingContext(sc, batchDuration = Seconds(5))

// Create the collection of numbers
val nums = 0 to 9

// Create constant input dstream with the RDD
import pl.japila.spark.streaming.CustomInputDStream
val cis = new CustomInputDStream(ssc, nums)

// Sample stream computation
cis.print
Tip
Copy and paste it to spark-shell to run it.

results matching ""

    No results matching ""