abort(): Unit
StateStore Contract — Kay-Value Store for Streaming State Data
StateStore is the abstraction of key-value stores for managing state in Stateful Stream Processing (e.g. for persisting running aggregates in Streaming Aggregation).
StateStore supports incremental checkpointing in which only the key-value "Row" pairs that changed are committed or aborted (without touching other key-value pairs).
StateStore is identified with the aggregating operator id and the partition id (among other properties for identification).
|
Note
|
HDFSBackedStateStore is the default and only known implementation of the StateStore Contract in Spark Structured Streaming. |
| Method | Description | ||
|---|---|---|---|
|
Aborts (discards) changes to the state store Used when:
|
||
|
Commits the changes to the state store (and returns the current version) Used when:
|
||
|
Looks up (gets) the value of the given non- Used when:
|
||
|
Gets the key-value pairs of Used when:
|
||
|
Flag to indicate whether state changes have been committed ( Used when:
|
||
|
The ID of the state store Used when:
|
||
|
Returns an iterator with all the kay-value pairs in the state store Used when:
|
||
|
StateStoreMetrics of the state store Used when:
|
||
|
Stores (puts) the value for the (non-null) key Used when:
|
||
|
Removes the (non-null) key from the state store Used when:
|
||
|
Version of the state store Used exclusively when |
|
Note
|
Read the motivation and design in State Store for Streaming Aggregations. |
|
Tip
|
Enable Add the following line to
Refer to Logging. |
Creating (and Caching) RPC Endpoint Reference to StateStoreCoordinator for Executors — coordinatorRef Internal Object Method
coordinatorRef: Option[StateStoreCoordinatorRef]
coordinatorRef requests the SparkEnv helper object for the current SparkEnv.
If the SparkEnv is available and the _coordRef is not assigned yet, coordinatorRef prints out the following DEBUG message to the logs followed by requesting the StateStoreCoordinatorRef for the StateStoreCoordinator endpoint.
Getting StateStoreCoordinatorRef
If the SparkEnv is available, coordinatorRef prints out the following INFO message to the logs:
Retrieved reference to StateStoreCoordinator: [_coordRef]
|
Note
|
coordinatorRef is used when StateStore helper object is requested to reportActiveStoreInstance (when StateStore object helper is requested to find the StateStore by StateStoreProviderId) and verifyIfStoreInstanceActive (when StateStore object helper is requested to doMaintenance).
|
Unloading State Store Provider — unload Method
unload(storeProviderId: StateStoreProviderId): Unit
unload…FIXME
|
Note
|
unload is used when StateStore helper object is requested to stop and doMaintenance.
|
Announcing New StateStoreProvider — reportActiveStoreInstance Internal Object Method
reportActiveStoreInstance(
storeProviderId: StateStoreProviderId): Unit
reportActiveStoreInstance takes the current host and executorId (from the BlockManager on the Spark executor) and requests the StateStoreCoordinatorRef to reportActiveInstance.
|
Note
|
reportActiveStoreInstance uses SparkEnv to access the BlockManager.
|
In the end, reportActiveStoreInstance prints out the following INFO message to the logs:
Reported that the loaded instance [storeProviderId] is active
|
Note
|
reportActiveStoreInstance is used exclusively when StateStore utility is requested to find the StateStore by StateStoreProviderId.
|
MaintenanceTask Daemon Thread
MaintenanceTask is a daemon thread that triggers maintenance work of registered StateStoreProviders.
When an error occurs, MaintenanceTask clears loadedProviders internal registry.
MaintenanceTask is scheduled on state-store-maintenance-task thread pool that runs periodically every spark.sql.streaming.stateStore.maintenanceInterval (default: 60s).
Looking Up StateStore by Provider ID — get Object Method
get(
storeProviderId: StateStoreProviderId,
keySchema: StructType,
valueSchema: StructType,
indexOrdinal: Option[Int],
version: Long,
storeConf: StateStoreConf,
hadoopConf: Configuration): StateStore
get finds StateStore for the specified StateStoreProviderId and version.
|
Note
|
The version is either the current epoch (in Continuous Stream Processing) or the current batch ID (in Micro-Batch Stream Processing). |
Internally, get looks up the StateStoreProvider (by storeProviderId) in the loadedProviders internal cache. If unavailable, get uses the StateStoreProvider utility to create and initialize one.
get will also start the periodic maintenance task (unless already started) and announce the new StateStoreProvider.
In the end, get requests the StateStoreProvider to look up the StateStore by the specified version.
|
Note
|
|
Starting Periodic Maintenance Task (Unless Already Started) — startMaintenanceIfNeeded Internal Object Method
startMaintenanceIfNeeded(): Unit
startMaintenanceIfNeeded schedules MaintenanceTask to start after and every spark.sql.streaming.stateStore.maintenanceInterval (defaults to 60s).
|
Note
|
startMaintenanceIfNeeded does nothing when the maintenance task has already been started and is still running.
|
|
Note
|
startMaintenanceIfNeeded is used exclusively when StateStore is requested to find the StateStore by StateStoreProviderId.
|
Doing State Maintenance of Registered State Store Providers — doMaintenance Internal Object Method
doMaintenance(): Unit
Internally, doMaintenance prints the following DEBUG message to the logs:
Doing maintenance
doMaintenance then requests every StateStoreProvider (registered in loadedProviders) to do its own internal maintenance (only when a StateStoreProvider is still active).
When a StateStoreProvider is inactive, doMaintenance removes it from the provider registry and prints the following INFO message to the logs:
Unloaded [provider]
|
Note
|
doMaintenance is used exclusively in MaintenanceTask daemon thread.
|
verifyIfStoreInstanceActive Internal Object Method
verifyIfStoreInstanceActive(storeProviderId: StateStoreProviderId): Boolean
verifyIfStoreInstanceActive…FIXME
|
Note
|
verifyIfStoreInstanceActive is used exclusively when StateStore helper object is requested to doMaintenance (from a running MaintenanceTask daemon thread).
|
Internal Properties
| Name | Description |
|---|---|
|
Loaded providers internal cache, i.e. StateStoreProviders per StateStoreProviderId Used in…FIXME |
|
StateStoreCoordinator RPC endpoint (a Used in…FIXME |