Receivers

Receivers run on workers to receive external data. They are created and belong to ReceiverInputDStreams.

Note
ReceiverTracker launches a receiver on a worker.

It is represented by abstract class Receiver that is parameterized by the type of the elements it processes as well as StorageLevel.

Note
You use StreamingContext.receiverStream method to register a custom Receiver to a streaming context.

The abstract Receiver class requires the following methods to be implemented (see Custom Receiver):

  • onStart() that starts the receiver when the application starts.

  • onStop() that stops the receiver.

A receiver is identified by the unique identifier Receiver.streamId (that corresponds to the unique identifier of the receiver input stream it is associated with).

Note
StorageLevel of a receiver is used to instantiate ReceivedBlockHandler in ReceiverSupervisorImpl.

A receiver uses store methods to store received data as data blocks into Spark’s memory.

Note
Receivers must have ReceiverSupervisors attached before they can be started since store and management methods simply pass calls on to the respective methods in the ReceiverSupervisor.

A receiver can be in one of the three states: Initialized, Started, and Stopped.

Custom Receiver

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiver

final class MyStringReceiver extends Receiver[String](StorageLevel.NONE) {

  def onStart() = {
    println("onStart called")
  }

  def onStop() = {
    println("onStop called")
  }
}

val ssc = new StreamingContext(sc, Seconds(5))
val strings = ssc.receiverStream(new MyStringReceiver)
strings.print

ssc.start

// MyStringReceiver will print "onStart called"

ssc.stop()

// MyStringReceiver will print "onStop called"

results matching ""

    No results matching ""