mapPartitionsWithStateStore[U](
stateInfo: StatefulOperatorStateInfo,
keySchema: StructType,
valueSchema: StructType,
indexOrdinal: Option[Int],
sessionState: SessionState,
storeCoordinator: Option[StateStoreCoordinatorRef])(
storeUpdateFunction: (StateStore, Iterator[T]) => Iterator[U]): StateStoreRDD[T, U]
// Used for testing only
mapPartitionsWithStateStore[U](
sqlContext: SQLContext,
stateInfo: StatefulOperatorStateInfo,
keySchema: StructType,
valueSchema: StructType,
indexOrdinal: Option[Int])(
storeUpdateFunction: (StateStore, Iterator[T]) => Iterator[U]): StateStoreRDD[T, U] (1)
StateStoreOps — Extension Methods for Creating StateStoreRDD
StateStoreOps is a Scala implicit class of a data RDD (of type RDD[T]) to create a StateStoreRDD for the following physical operators:
|
Note
|
Implicit Classes are a language feature in Scala for implicit conversions with extension methods for existing types. |
Creating StateStoreRDD (with storeUpdateFunction Aborting StateStore When Task Fails) — mapPartitionsWithStateStore Method
-
Uses
sqlContext.streams.stateStoreCoordinatorto accessStateStoreCoordinator
Internally, mapPartitionsWithStateStore requests SparkContext to clean storeUpdateFunction function.
|
Note
|
mapPartitionsWithStateStore uses the enclosing RDD to access the current SparkContext.
|
|
Note
|
Function Cleaning is to clean a closure from unreferenced variables before it is serialized and sent to tasks. SparkContext reports a SparkException when the closure is not serializable.
|
mapPartitionsWithStateStore then creates a (wrapper) function to abort the StateStore if state updates had not been committed before a task finished (which is to make sure that the StateStore has been committed or aborted in the end to follow the contract of StateStore).
|
Note
|
mapPartitionsWithStateStore uses TaskCompletionListener to be notified when a task has finished.
|
In the end, mapPartitionsWithStateStore creates a StateStoreRDD (with the wrapper function, SessionState and StateStoreCoordinatorRef).
|
Note
|
|