StateDStream is the specialized DStream that is the result of updateStateByKey stateful operator. It is a wrapper around a parent key-value pair dstream to build stateful pipeline (by means of updateStateByKey operator) and as a stateful dstream enables checkpointing (and hence requires some additional setup).

It uses a parent key-value pair dstream, updateFunc update state function, a partitioner, a flag whether or not to preservePartitioning and an optional key-value pair initialRDD.

It works with MEMORY_ONLY_SER storage level enabled.

The only dependency of StateDStream is the input parent key-value pair dstream.

The slide duration is exactly the same as that in parent.

It forces checkpointing regardless of the current dstream configuration, i.e. the internal mustCheckpoint is enabled.

When requested to compute a RDD it first attempts to get the state RDD for the previous batch (using DStream.getOrCompute). If there is one, parent stream is requested for a RDD for the current batch (using DStream.getOrCompute). If parent has computed one, computeUsingPreviousRDD(parentRDD, prevStateRDD) is called.

FIXME When could getOrCompute not return an RDD? How does this apply to the StateDStream? What about the parent’s getOrCompute?

If however parent has not generated a RDD for the current batch but the state RDD existed, updateFn is called for every key of the state RDD to generate a new state per partition (using RDD.mapPartitions)

No input data for already-running input stream triggers (re)computation of the state RDD (per partition).
spark streaming StateDStream compute.png
Figure 1. Computing stateful RDDs (StateDStream.compute)

If the state RDD has been found, which means that this is the first input data batch, parent stream is requested to getOrCompute the RDD for the current batch.

Otherwise, when no state RDD exists, parent stream is requested for a RDD for the current batch (using DStream.getOrCompute) and when no RDD was generated for the batch, no computation is triggered.

When the stream processing starts, i.e. no state RDD exists, and there is no input data received, no computation is triggered.

Given no state RDD and with parent RDD computed, when initialRDD is NONE, the input data batch (as parent RDD) is grouped by key (using groupByKey with partitioner) and then the update state function updateFunc is applied to the partitioned input data (using RDD.mapPartitions) with None state. Otherwise, computeUsingPreviousRDD(parentRDD, initialStateRDD) is called.

updateFunc - State Update Function

The signature of updateFunc is as follows:

updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)]

It should be read as given a collection of triples of a key, new records for the key, and the current state for the key, generate a collection of keys and their state.


computeUsingPreviousRDD(parentRDD: RDD[(K, V)], prevStateRDD: RDD[(K, S)]): Option[RDD[(K, S)]]

The computeUsingPreviousRDD method uses cogroup and mapPartitions to build the final state RDD.

Regardless of the return type Option[RDD[(K, S)]] that really allows no state, it will always return some state.

It first performs cogroup of parentRDD and prevStateRDD using the constructor’s partitioner so it has a pair of iterators of elements of each RDDs per every key.

It is acceptable to end up with keys that have no new records per batch, but these keys do have a state (since they were received previously when no state might have been built yet).

The signature of cogroup is as follows and applies to key-value pair RDDs, i.e. RDD[(K, V)].

cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]

It defines an internal update function finalFunc that maps over the collection of all the keys, new records per key, and at-most-one-element state per key to build new iterator that ensures that:

  1. a state per key exists (it is None or the state built so far)

  2. the lazy iterable of new records is transformed into an eager sequence.

FIXME Why is the transformation from an Iterable into a Seq so important? Why could not the constructor’s updateFunc accept the former?

With every triple per every key, the internal update function calls the constructor’s updateFunc.

The state RDD is a cogrouped RDD (on parentRDD and prevStateRDD using the constructor’s partitioner) with every element per partition mapped over using the internal update function finalFunc and the constructor’s preservePartitioning (through mapPartitions).

FIXME Why is preservePartitioning important? What happens when mapPartitions does not preserve partitioning (which by default it does not!)

results matching ""

    No results matching ""