ReceiverSupervisors

ReceiverSupervisor is an (abstract) handler object that is responsible for supervising a receiver (that runs on the worker). It assumes that implementations offer concrete methods to push received data to Spark.

Note
Receiver's store methods pass calls to respective push methods of ReceiverSupervisors.
Note
ReceiverTracker starts a ReceiverSupervisor per receiver.

ReceiverSupervisor can be started and stopped. When a supervisor is started, it calls (empty by default) onStart() and startReceiver() afterwards.

It attaches itself to the receiver it is a supervisor of (using Receiver.attachSupervisor). That is how a receiver knows about its supervisor (and can hence offer the store and management methods).

ReceiverSupervisor Contract

ReceiverSupervisor is a private[streaming] abstract class that assumes that concrete implementations offer the following push methods:

  • pushBytes

  • pushIterator

  • pushArrayBuffer

There are the other methods required:

  • createBlockGenerator

  • reportError

  • onReceiverStart

Starting Receivers

startReceiver() calls (abstract) onReceiverStart(). When true (it is unknown at this point to know when it is true or false since it is an abstract method - see ReceiverSupervisorImpl.onReceiverStart for the default implementation), it prints the following INFO message to the logs:

INFO Starting receiver

The receiver’s onStart() is called and another INFO message appears in the logs:

INFO Called receiver onStart

If however onReceiverStart() returns false, the supervisor stops (using stop).

Stopping Receivers

stop method is called with a message and an optional cause of the stop (called error). It calls stopReceiver method that prints the INFO message and checks the state of the receiver to react appropriately.

When the receiver is in Started state, stopReceiver calls Receiver.onStop(), prints the following INFO message, and onReceiverStop(message, error).

INFO Called receiver onStop

Restarting Receivers

A ReceiverSupervisor uses spark.streaming.receiverRestartDelay to restart the receiver with delay.

Note
Receivers can request to be restarted using restart methods.

When requested to restart a receiver, it uses a separate thread to perform it asynchronously. It prints the WARNING message to the logs:

WARNING Restarting receiver with delay [delay] ms: [message]

It then stops the receiver, sleeps for delay milliseconds and starts the receiver (using startReceiver()).

You should see the following messages in the logs:

DEBUG Sleeping for [delay]
INFO Starting receiver again
INFO Receiver started again
Caution
FIXME What is a backend data store?

Awaiting Termination

awaitTermination method blocks the current thread to wait for the receiver to be stopped.

Note
ReceiverTracker uses awaitTermination to wait for receivers to stop (see StartAllReceivers).

When called, you should see the following INFO message in the logs:

INFO Waiting for receiver to be stopped

If a receiver has terminated successfully, you should see the following INFO message in the logs:

INFO Stopped receiver without error

Otherwise, you should see the ERROR message in the logs:

ERROR Stopped receiver with error: [stoppingError]

stoppingError is the exception associated with the stopping of the receiver and is rethrown.

Note
Internally, ReceiverSupervisor uses java.util.concurrent.CountDownLatch with count 1 to await the termination.

Internals - How to count stopLatch down

stopLatch is decremented when ReceiverSupervisor’s stop is called which is in the following cases:

  • When a receiver itself calls stop(message: String) or stop(message: String, error: Throwable)

  • When ReceiverSupervisor.onReceiverStart() returns false or NonFatal (less severe) exception is thrown in ReceiverSupervisor.startReceiver.

  • When ReceiverTracker.stop is called that posts StopAllReceivers message to ReceiverTrackerEndpoint. It in turn sends StopReceiver to the ReceiverSupervisorImpl for every ReceiverSupervisor that calls ReceiverSupervisorImpl.stop.

Caution

FIXME Prepare exercises

  • for a receiver to call stop(message: String) when a custom "TERMINATE" message arrives

  • send StopReceiver to a ReceiverTracker

ReceiverSupervisorImpl

ReceiverSupervisorImpl is the implementation of ReceiverSupervisor contract.

Note
A dedicated ReceiverSupervisorImpl is started for every receiver when ReceiverTracker starts. See ReceiverTrackerEndpoint.startReceiver.

It communicates with ReceiverTracker that runs on the driver (by posting messages using the ReceiverTracker RPC endpoint).

Tip

Enable DEBUG logging level for org.apache.spark.streaming.receiver.ReceiverSupervisorImpl logger to see what happens in ReceiverSupervisorImpl.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.streaming.receiver.ReceiverSupervisorImpl=DEBUG

push Methods

push methods, i.e. pushArrayBuffer, pushIterator, and pushBytes solely pass calls on to ReceiverSupervisorImpl.pushAndReportBlock.

ReceiverSupervisorImpl.onReceiverStart

ReceiverSupervisorImpl.onReceiverStart sends a blocking RegisterReceiver message to ReceiverTracker that responds with a boolean value.

Current Rate Limit

getCurrentRateLimit controls the current rate limit. It asks the BlockGenerator for the value (using getCurrentLimit).

ReceivedBlockHandler

ReceiverSupervisorImpl uses the internal field receivedBlockHandler for ReceivedBlockHandler to use.

It uses ReceivedBlockHandler to storeBlock (see ReceivedBlockHandler Contract for more coverage and ReceiverSupervisorImpl.pushAndReportBlock in this document).

ReceiverSupervisorImpl.pushAndReportBlock

ReceiverSupervisorImpl.pushAndReportBlock(receivedBlock: ReceivedBlock, metadataOption: Option[Any], blockIdOption: Option[StreamBlockId]) stores receivedBlock using ReceivedBlockHandler.storeBlock and reports it to the driver.

Note
ReceiverSupervisorImpl.pushAndReportBlock is only used by the push methods, i.e. pushArrayBuffer, pushIterator, and pushBytes. Calling the method is actually all they do.

When it calls ReceivedBlockHandler.storeBlock, you should see the following DEBUG message in the logs:

DEBUG Pushed block [blockId] in [time] ms

It then sends AddBlock (with ReceivedBlockInfo for streamId, BlockStoreResult.numRecords, metadataOption, and the result of ReceivedBlockHandler.storeBlock) to ReceiverTracker RPC endpoint (that runs on the driver).

When a response comes, you should see the following DEBUG message in the logs:

DEBUG Reported block [blockId]

results matching ""

    No results matching ""