MapWithStateDStream is the result of mapWithState stateful operator.

It extends DStream Contract with the following additional method:

def stateSnapshots(): DStream[(KeyType, StateType)]
MapWithStateDStream is a Scala sealed abstract class (and hence all the available implementations are in the source file).
MapWithStateDStreamImpl is the only implementation of MapWithStateDStream (see below in this document for more coverage).


MapWithStateDStreamImpl is an internal DStream with dependency on the parent dataStream key-value dstream. It uses a custom internal dstream called internalStream (of type InternalMapWithStateDStream).

slideDuration is exactly the slide duration of the internal stream internalStream.

dependencies returns a single-element collection with the internal stream internalStream.

The compute method may or may not return a RDD[MappedType] by getOrCompute on the internal stream and…​TK



InternalMapWithStateDStream is an internal dstream to support MapWithStateDStreamImpl and uses dataStream (as parent of type DStream[(K, V)]) as well as StateSpecImpl[K, V, S, E] (as spec).

InternalMapWithStateDStream is a DStream[MapWithStateRDDRecord[K, S, E]] that uses MEMORY_ONLY storage level by default.

InternalMapWithStateDStream uses the StateSpec's partitioner or HashPartitioner (with SparkContext’s defaultParallelism).

slideDuration is the slide duration of parent.

dependencies is a single-element collection with the parent stream.

It forces checkpointing (i.e. mustCheckpoint flag is enabled).

When initialized, if checkpoint interval is not set, it sets it as ten times longer than the slide duration of the parent stream (the multiplier is not configurable and always 10).

Computing a RDD[MapWithStateRDDRecord[K, S, E]] (i.e. compute method) first looks up a previous RDD for the last slideDuration.

If the RDD is found, it is returned as is given the partitioners of the RDD and the stream are equal. Otherwise, when the partitioners are different, the RDD is "repartitioned" using MapWithStateRDD.createFromRDD.

FIXME MapWithStateRDD.createFromRDD

