state: STATE
HDFSBackedStateStore — State Store on HDFS-Compatible File System
HDFSBackedStateStore is a concrete StateStore that uses a Hadoop DFS-compatible file system for versioned state persistence.
HDFSBackedStateStore is created exclusively when HDFSBackedStateStoreProvider is requested for the specified version of state (store) for update (when StateStore utility is requested to look up a StateStore by provider id).
HDFSBackedStateStore uses the StateStoreId of the owning HDFSBackedStateStoreProvider.
When requested for the textual representation, HDFSBackedStateStore gives HDFSStateStore[id=(op=[operatorId],part=[partitionId]),dir=[baseDir]].
|
Tip
|
|
Creating HDFSBackedStateStore Instance
HDFSBackedStateStore takes the following to be created:
HDFSBackedStateStore initializes the internal properties.
Internal State — state Internal Property
state is the current state of HDFSBackedStateStore and can be in one of the three possible states: ABORTED, COMMITTED, and UPDATING.
State changes (to the internal mapToUpdate registry) are allowed as long as HDFSBackedStateStore is in the default UPDATING state. Right after a HDFSBackedStateStore transitions to either COMMITTED or ABORTED state, no further state changes are allowed.
|
Note
|
Don’t get confused with the term "state" as there are two states: the internal state of HDFSBackedStateStore and the state of a streaming query (that HDFSBackedStateStore is responsible for).
|
| Name | Description |
|---|---|
|
After abort |
|
After commit hasCommitted flag indicates whether |
|
writeUpdateToDeltaFile Internal Method
writeUpdateToDeltaFile(
output: DataOutputStream,
key: UnsafeRow,
value: UnsafeRow): Unit
|
Caution
|
FIXME |
put Method
put(
key: UnsafeRow,
value: UnsafeRow): Unit
|
Note
|
put is a part of StateStore Contract to…FIXME
|
put stores the copies of the key and value in mapToUpdate internal registry followed by writing them to a delta file (using tempDeltaFileStream).
put reports an IllegalStateException when HDFSBackedStateStore is not in UPDATING state:
Cannot put after already committed or aborted
Committing State Changes — commit Method
commit(): Long
|
Note
|
commit is part of the StateStore Contract to commit state changes.
|
commit requests the parent HDFSBackedStateStoreProvider to commit state changes (as a new version of state) (with the newVersion, the mapToUpdate and the compressed stream).
commit transitions HDFSBackedStateStore to COMMITTED state.
commit prints out the following INFO message to the logs:
Committed version [newVersion] for [this] to file [finalDeltaFile]
commit returns a newVersion.
commit throws an IllegalStateException when HDFSBackedStateStore is not in UPDATING state:
Cannot commit after already committed or aborted
commit throws an IllegalStateException for any NonFatal exception:
Error committing version [newVersion] into [this]
Aborting State Changes — abort Method
abort(): Unit
|
Note
|
abort is part of the StateStore Contract to abort the state changes.
|
abort…FIXME
Performance Metrics — metrics Method
metrics: StateStoreMetrics
|
Note
|
metrics is part of the StateStore Contract to get the StateStoreMetrics.
|
metrics requests the performance metrics of the parent HDFSBackedStateStoreProvider.
The performance metrics of the provider used are only the ones listed in supportedCustomMetrics.
In the end, metrics returns a new StateStoreMetrics with the following:
-
Total number of keys as the size of mapToUpdate
-
Memory used (in bytes) as the memoryUsedBytes metric (of the parent provider)
-
StateStoreCustomMetrics as the supportedCustomMetrics and the metricStateOnCurrentVersionSizeBytes metric of the parent provider
Are State Changes Committed? — hasCommitted Method
hasCommitted: Boolean
|
Note
|
hasCommitted is part of the StateStore Contract to indicate whether state changes have been committed or not.
|
hasCommitted returns true when HDFSBackedStateStore is in COMMITTED state and false otherwise.
Internal Properties
| Name | Description |
|---|---|
|
The compressed java.io.DataOutputStream for the deltaFileStream |
|
|
|
|
|
Used exclusively when |