log4j.logger.org.apache.spark.streaming.dstream.DStream=DEBUG
DStream
— Discretized Stream
Discretized Stream (DStream) is the fundamental concept of Spark Streaming. It is basically a stream of RDDs with elements being the data received from input streams for batch (possibly extended in scope by windowed or stateful operators).
There is no notion of input and output dstreams. DStreams are all instances of DStream
abstract class (see DStream Contract in this document). You may however correctly assume that all dstreams are input. And it happens to be so until you register a dstream that marks it as output.
Name | Initial Value | Description |
---|---|---|
|
|
StorageLevel of the RDDs in the |
|
|
The flag to inform whether it was restored from checkpoint. |
|
|
The reference to DStreamGraph. |
A DStream
is represented as org.apache.spark.streaming.dstream.DStream abstract class.
Tip
|
Enable Add the following line to Refer to Logging. |
DStream Contract
A DStream
is defined by the following properties (with the names of the corresponding methods that subclasses have to implement):
-
dstream dependencies, i.e. a collection of
DStreams
that thisDStream
depends on. They are often referred to as parent dstreams.def dependencies: List[DStream[_]]
-
slide duration (aka slide interval), i.e. a time interval after which the stream is requested to generate a RDD out of input data it consumes.
def slideDuration: Duration
-
How to compute (generate) an optional RDD for the given batch if any.
validTime
is a point in time that marks the end boundary of slide duration.def compute(validTime: Time): Option[RDD[T]]
Creating DStreams
You can create dstreams through the built-in input stream constructors using streaming context or more specialized add-ons for external input data sources, e.g. Apache Kafka.
Note
|
DStreams can only be created before StreamingContext is started.
|
Zero Time (aka zeroTime)
Zero time (internally zeroTime
) is the time when a dstream was initialized.
It serves as the initialization marker (via isInitialized
method) and helps calculating intervals for RDD checkpointing (when checkpoint interval is set and the current batch time is a multiple thereof), slicing, and the time validation for a batch (when a dstream generates a RDD).
Remember Interval (aka rememberDuration)
Remember interval (internally rememberDuration
) is the time interval for how long a dstream is supposed to remember (aka cache) RDDs created. This is a mandatory attribute of every dstream which is validated at startup.
Note
|
It is used for metadata cleanup of a dstream. |
Initially, when a dstream is created, the remember interval is not set (i.e. null
), but is set when the dstream is initialized.
It can be set to a custom value using remember method.
Note
|
You may see the current value of remember interval when a dstream is validated at startup and the log level is INFO. |
generatedRDDs
- Internal Cache of Batch Times and Corresponding RDDs
generatedRDDs
is an internal collection of pairs of batch times and the corresponding RDDs that were generated for the batch. It acts as a cache when a dstream is requested to compute a RDD for batch (i.e. generatedRDDs
may already have the RDD or gets a new RDD added).
generatedRDDs
is empty initially, i.e. when a dstream is created.
It is a transient data structure so it is not serialized when a dstream is. It is initialized to an empty collection when deserialized. You should see the following DEBUG message in the logs when it happens:
DEBUG [the simple class name of dstream].readObject used
As new RDDs are added, dstreams offer a way to clear the old metadata during which the old RDDs are removed from generatedRDDs
collection.
If checkpointing is used, generatedRDDs
collection can be recreated from a storage.
Initializing DStreams — initialize
Method
initialize(time: Time): Unit
initialize
method sets zero time and optionally checkpoint interval (if the dstream must checkpoint and the interval was not set already) and remember duration.
Note
|
initialize method is called for output dstreams only when DStreamGraph is started.
|
The zero time of a dstream can only be set once or be set again to the same zero time. Otherwise, it throws SparkException
as follows:
ZeroTime is already initialized to [zeroTime], cannot initialize it again to [time]
It verifies that checkpoint interval is defined when mustCheckpoint was enabled.
Note
|
The internal mustCheckpoint flag is disabled by default. It is set by custom dstreams like StateDStreams.
|
If mustCheckpoint
is enabled and the checkpoint interval was not set, it is automatically set to the slide interval or 10 seconds, whichever is longer. You should see the following INFO message in the logs when the checkpoint interval was set automatically:
INFO [DStreamType]: Checkpoint interval automatically set to [checkpointDuration]
It then ensures that remember interval is at least twice the checkpoint interval (only if defined) or the slide duration.
At the very end, it initializes the parent dstreams (available as dependencies) that recursively initializes the entire graph of dstreams.
remember
Method
remember(duration: Duration): Unit
remember
sets remember interval for the current dstream and the dstreams it depends on (see dependencies).
If the input duration
is specified (i.e. not null
), remember
allows setting the remember interval (only when the current value was not set already) or extend it (when the current value is shorter).
You should see the following INFO message in the logs when the remember interval changes:
INFO Duration for remembering RDDs set to [rememberDuration] for [dstream]
At the end, remember
always sets the current remember interval (whether it was set, extended or did not change).
Checkpointing DStreams — checkpoint
Method
checkpoint(interval: Duration): DStream[T]
You use checkpoint(interval: Duration)
method to set up a periodic checkpointing every (checkpoint) interval
.
You can only enable checkpointing and set the checkpoint interval before StreamingContext is started or UnsupportedOperationException
is thrown as follows:
java.lang.UnsupportedOperationException: Cannot change checkpoint interval of an DStream after streaming context has started
at org.apache.spark.streaming.dstream.DStream.checkpoint(DStream.scala:177)
... 43 elided
Internally, checkpoint
method calls persist (that sets the default MEMORY_ONLY_SER
storage level).
If checkpoint interval is set, the checkpoint directory is mandatory. Spark validates it when StreamingContext starts and throws a IllegalArgumentException
exception if not set.
java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().
You can see the value of the checkpoint interval for a dstream in the logs when it is validated:
INFO Checkpoint interval = [checkpointDuration]
Checkpointing
DStreams can checkpoint input data at specified time intervals.
The following settings are internal to a dstream and define how it checkpoints the input data if any.
-
mustCheckpoint
(default:false
) is an internal private flag that marks a dstream as being checkpointed (true
) or not (false
). It is an implementation detail and the author of aDStream
implementation sets it.Refer to Initializing DStreams (initialize method) to learn how it is used to set the checkpoint interval, i.e.
checkpointDuration
. -
checkpointDuration
is a configurable property that says how often a dstream checkpoints data. It is often called checkpoint interval. If not set explicitly, but the dstream is checkpointed, it will be while initializing dstreams. -
checkpointData
is an instance of DStreamCheckpointData. -
restoredFromCheckpointData
(default:false
) is an internal flag to describe the initial state of a dstream, i.e.. whether (true
) or not (false
) it was started by restoring state from checkpoint.
Registering Output Streams — register
Method
register(): DStream[T]
DStream
by design has no notion of being an output stream. It is DStreamGraph to know and be able to differentiate between input and output streams.
DStream
comes with internal register
method that registers a DStream
as an output stream.
The internal private foreachRDD
method uses register
to register output streams to DStreamGraph. Whenever called, it creates ForEachDStream and calls register
upon it. That is how streams become output streams.
Generating Streaming Job For Batch For Output DStream — generateJob
Internal Method
generateJob(time: Time): Option[Job]
generateJob
generates a streaming job for a time
batch for a (output) dstream. It may or may not generate a streaming job for the requested batch time
if there are RDDs to process.
Note
|
generateJob is called when DStreamGraph generates jobs for a batch time.
|
It computes an RDD for the batch and, if there is one, returns a streaming job for the batch time
and a job function that will run a Spark job (with the generated RDD and the job function) when executed.
Note
|
The Spark job uses an empty function to calculate partitions of a RDD. |
Caution
|
FIXME What happens when SparkContext.runJob(rdd, emptyFunc) is called with the empty function, i.e. (iterator: Iterator[T]) ⇒ {} ?
|
Computing RDD for Batch — getOrCompute
Internal Method
getOrCompute(time: Time): Option[RDD[T]]
getOrCompute
returns an optional RDD
for a time
batch.
Note
|
getOrCompute is private[streaming] final method.
|
getOrCompute
uses generatedRDDs to return the RDD if it has already been generated for the time
. If not, it generates one by computing the input stream (using compute(validTime: Time)
method).
If there was anything to process in the input stream, i.e. computing the input stream returned a RDD, the RDD is first persisted (only if storageLevel
for the input stream is different from NONE
storage level).
You should see the following DEBUG message in the logs:
DEBUG Persisting RDD [id] for time [time] to [storageLevel]
The generated RDD is checkpointed if checkpointDuration is defined and the time interval between current and zero times is a multiple of checkpointDuration.
You should see the following DEBUG message in the logs:
DEBUG Marking RDD [id] for time [time] for checkpointing
The generated RDD is saved in the internal generatedRDDs registry.
Note
|
getOrCompute is used when a DStream is requested to generate a streaming job for a batch.
|
restoreCheckpointData
restoreCheckpointData(): Unit
restoreCheckpointData
does its work only when the internal transient restoredFromCheckpointData
flag is disabled (i.e. false
) and is so initially.
Note
|
restoreCheckpointData method is called when DStreamGraph is requested to restore state of output dstreams.
|
If restoredFromCheckpointData
is disabled, you should see the following INFO message in the logs:
INFO ...DStream: Restoring checkpoint data
DStreamCheckpointData.restore() is executed. And then restoreCheckpointData
method is executed for every dstream the current dstream depends on (see DStream Contract).
Once completed, the internal restoredFromCheckpointData
flag is enabled (i.e. true
) and you should see the following INFO message in the logs:
INFO Restored checkpoint data
Metadata Cleanup — clearMetadata
Method
Note
|
It is called when DStreamGraph clears metadata for every output stream. |
clearMetadata(time: Time)
is called to remove old RDDs that have been generated so far (and collected in generatedRDDs). It is a sort of garbage collector.
When clearMetadata(time: Time)
is called, it checks spark.streaming.unpersist flag (default enabled).
It collects generated RDDs (from generatedRDDs) that are older than rememberDuration.
You should see the following DEBUG message in the logs:
DEBUG Clearing references to old RDDs: [[time] -> [rddId], ...]
Regardless of spark.streaming.unpersist flag, all the collected RDDs are removed from generatedRDDs.
When spark.streaming.unpersist flag is set (it is by default), you should see the following DEBUG message in the logs:
DEBUG Unpersisting old RDDs: [id1, id2, ...]
For every RDD in the list, it unpersists them (without blocking) one by one and explicitly removes blocks for BlockRDDs. You should see the following INFO message in the logs:
INFO Removing blocks of RDD [blockRDD] of time [time]
After RDDs have been removed from generatedRDDs (and perhaps unpersisted), you should see the following DEBUG message in the logs:
DEBUG Cleared [size] RDDs that were older than [time]: [time1, time2, ...]
The stream passes the call to clear metadata to its dependencies.
updateCheckpointData
Method
updateCheckpointData(currentTime: Time): Unit
Note
|
It is called when DStreamGraph is requested to do updateCheckpointData itself. |
When updateCheckpointData
is called, you should see the following DEBUG message in the logs:
DEBUG Updating checkpoint data for time [currentTime] ms
It then executes DStreamCheckpointData.update(currentTime) and calls updateCheckpointData
method on each dstream the dstream depends on.
When updateCheckpointData
finishes, you should see the following DEBUG message in the logs:
DEBUG Updated checkpoint data for time [currentTime]: [checkpointData]