DStreamGraph

DStreamGraph (is a final helper class that) manages input and output dstreams. It also holds zero time for the other components that marks the time when it was started.

DStreamGraph maintains the collections of InputDStream instances (as inputStreams) and output DStream instances (as outputStreams), but, more importantly, it generates streaming jobs for output streams for a batch (time).

DStreamGraph holds the batch interval for the other parts of a Streaming application.

Tip

Enable INFO or DEBUG logging level for org.apache.spark.streaming.DStreamGraph logger to see what happens in DStreamGraph.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.streaming.DStreamGraph=DEBUG

Refer to Logging.

Zero Time (aka zeroTime)

Zero time (internally zeroTime) is the time when DStreamGraph has been started.

It is passed on down the output dstream graph so output dstreams can initialize themselves.

Start Time (aka startTime)

Start time (internally startTime) is the time when DStreamGraph has been started or restarted.

Note
At regular start start time is exactly zero time.

Batch Interval (aka batchDuration)

DStreamGraph holds the batch interval (as batchDuration) for the other parts of a Streaming application.

setBatchDuration(duration: Duration) is the method to set the batch interval.

It appears that it is the place for the value since it must be set before JobGenerator can be instantiated.

It is set while StreamingContext is being instantiated and is validated (using validate() method of StreamingContext and DStreamGraph) before StreamingContext is started.

Maximum Remember Interval — getMaxInputStreamRememberDuration Method

getMaxInputStreamRememberDuration(): Duration

Maximum Remember Interval is the maximum remember interval across all the input dstreams. It is calculated using getMaxInputStreamRememberDuration method.

Note
It is called when JobGenerator is requested to clear metadata and checkpoint data.

Input DStreams Registry

Caution
FIXME

Output DStreams Registry

DStream by design has no notion of being an output dstream. To mark a dstream as output you need to register a dstream (using DStream.register method) which happens for…​FIXME

Starting DStreamGraph

start(time: Time): Unit

When DStreamGraph is started (using start method), it sets zero time and start time.

Note
start method is called when JobGenerator starts for the first time (not from a checkpoint).
Note
You can start DStreamGraph as many times until time is not null and zero time has been set.

(output dstreams) start then walks over the collection of output dstreams and for each output dstream, one at a time, calls their initialize(zeroTime), remember (with the current remember interval), and validateAtStart methods.

(input dstreams) When all the output streams are processed, it starts the input dstreams (in parallel) using start method.

Stopping DStreamGraph

stop(): Unit
Caution
FIXME

Restarting DStreamGraph

restart(time: Time): Unit

restart sets start time to be time input parameter.

Note
This is the only moment when zero time can be different than start time.
Caution
restart doesn’t seem to be called ever.

Generating Streaming Jobs for Output DStreams for Batch Time — generateJobs Method

generateJobs(time: Time): Seq[Job]

generateJobs method generates a collection of streaming jobs for output streams for a given batch time. It walks over each registered output stream (in outputStreams internal registry) and requests each stream for a streaming job

When generateJobs method executes, you should see the following DEBUG message in the logs:

DEBUG DStreamGraph: Generating jobs for time [time] ms

generateJobs then walks over each registered output stream (in outputStreams internal registry) and requests the streams for a streaming job.

Right before the method finishes, you should see the following DEBUG message with the number of streaming jobs generated (as jobs.length):

DEBUG DStreamGraph: Generated [jobs.length] jobs for time [time] ms

Validation Check

validate() method checks whether batch duration and at least one output stream have been set. It will throw java.lang.IllegalArgumentException when either is not.

Note
It is called when StreamingContext starts.

Metadata Cleanup

Note
It is called when JobGenerator clears metadata.

When clearMetadata(time: Time) is called, you should see the following DEBUG message in the logs:

DEBUG DStreamGraph: Clearing metadata for time [time] ms

It merely walks over the collection of output streams and (synchronously, one by one) asks to do its own metadata cleaning.

When finishes, you should see the following DEBUG message in the logs:

DEBUG DStreamGraph: Cleared old metadata for time [time] ms

Restoring State for Output DStreams — restoreCheckpointData Method

restoreCheckpointData(): Unit

When restoreCheckpointData() is executed, you should see the following INFO message in the logs:

INFO DStreamGraph: Restoring checkpoint data

Then, every output dstream is requested to restoreCheckpointData.

At the end, you should see the following INFO message in the logs:

INFO DStreamGraph: Restored checkpoint data
Note
restoreCheckpointData is executed when StreamingContext is recreated from checkpoint.

Updating Checkpoint Data — updateCheckpointData Method

updateCheckpointData(time: Time): Unit
Note
updateCheckpointData is called when JobGenerator processes DoCheckpoint events.

When updateCheckpointData is called, you should see the following INFO message in the logs:

INFO DStreamGraph: Updating checkpoint data for time [time] ms

It then walks over every output dstream and calls its updateCheckpointData(time).

When updateCheckpointData finishes it prints out the following INFO message to the logs:

INFO DStreamGraph: Updated checkpoint data for time [time] ms

Checkpoint Cleanup — clearCheckpointData Method

clearCheckpointData(time: Time)
Note
clearCheckpointData is called when JobGenerator clears checkpoint data.

When clearCheckpointData is called, you should see the following INFO message in the logs:

INFO DStreamGraph: Clearing checkpoint data for time [time] ms

It merely walks through the collection of output streams and (synchronously, one by one) asks to do their own checkpoint data cleaning.

When finished, you should see the following INFO message in the logs:

INFO DStreamGraph: Cleared checkpoint data for time [time] ms

Remember Interval

Remember interval is the time to remember (aka cache) the RDDs that have been generated by (output) dstreams in the context (before they are released and garbage collected).

It can be set using remember method.

remember Method

remember(duration: Duration): Unit

remember method simply sets remember interval and exits.

Note
It is called by StreamingContext.remember method.

It first checks whether or not it has been set already and if so, throws java.lang.IllegalArgumentException as follows:

java.lang.IllegalArgumentException: requirement failed: Remember duration already set as [rememberDuration] ms. Cannot set it again.
  at scala.Predef$.require(Predef.scala:219)
  at org.apache.spark.streaming.DStreamGraph.remember(DStreamGraph.scala:79)
  at org.apache.spark.streaming.StreamingContext.remember(StreamingContext.scala:222)
  ... 43 elided
Note
It only makes sense to call remember method before DStreamGraph is started, i.e. before StreamingContext is started, since the output dstreams are only given the remember interval when DStreamGraph starts.

results matching ""

    No results matching ""