import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiver
final class MyStringReceiver extends Receiver[String](StorageLevel.NONE) {
def onStart() = {
println("onStart called")
}
def onStop() = {
println("onStop called")
}
}
val ssc = new StreamingContext(sc, Seconds(5))
val strings = ssc.receiverStream(new MyStringReceiver)
strings.print
ssc.start
// MyStringReceiver will print "onStart called"
ssc.stop()
// MyStringReceiver will print "onStop called"
Receivers
Receivers run on workers to receive external data. They are created and belong to ReceiverInputDStreams.
Note
|
ReceiverTracker launches a receiver on a worker. |
It is represented by abstract class Receiver that is parameterized by the type of the elements it processes as well as StorageLevel.
Note
|
You use StreamingContext.receiverStream method to register a custom Receiver to a streaming context.
|
The abstract Receiver
class requires the following methods to be implemented (see Custom Receiver):
-
onStart()
that starts the receiver when the application starts. -
onStop()
that stops the receiver.
A receiver is identified by the unique identifier Receiver.streamId
(that corresponds to the unique identifier of the receiver input stream it is associated with).
Note
|
StorageLevel of a receiver is used to instantiate ReceivedBlockHandler in ReceiverSupervisorImpl. |
A receiver uses store
methods to store received data as data blocks into Spark’s memory.
Note
|
Receivers must have ReceiverSupervisors attached before they can be started since store and management methods simply pass calls on to the respective methods in the ReceiverSupervisor.
|
A receiver can be in one of the three states: Initialized
, Started
, and Stopped
.