HDFSBackedStateStore — State Store on HDFS-Compatible File System

HDFSBackedStateStore is a concrete StateStore that uses a Hadoop DFS-compatible file system for versioned state persistence.

HDFSBackedStateStore is created exclusively when HDFSBackedStateStoreProvider is requested for the specified version of state (store) for update (when StateStore utility is requested to look up a StateStore by provider id).

HDFSBackedStateStore uses the StateStoreId of the owning HDFSBackedStateStoreProvider.

When requested for the textual representation, HDFSBackedStateStore gives HDFSStateStore[id=(op=[operatorId],part=[partitionId]),dir=[baseDir]].


HDFSBackedStateStore is an internal class of HDFSBackedStateStoreProvider and uses its logger.

Creating HDFSBackedStateStore Instance

HDFSBackedStateStore takes the following to be created:

  • Version

  • State Map (ConcurrentHashMap[UnsafeRow, UnsafeRow])

HDFSBackedStateStore initializes the internal properties.

Internal State — state Internal Property

state: STATE

state is the current state of HDFSBackedStateStore and can be in one of the three possible states: ABORTED, COMMITTED, and UPDATING.

State changes (to the internal mapToUpdate registry) are allowed as long as HDFSBackedStateStore is in the default UPDATING state. Right after a HDFSBackedStateStore transitions to either COMMITTED or ABORTED state, no further state changes are allowed.

Don’t get confused with the term "state" as there are two states: the internal state of HDFSBackedStateStore and the state of a streaming query (that HDFSBackedStateStore is responsible for).
Table 1. Internal States
Name Description


After abort


After commit

hasCommitted flag indicates whether HDFSBackedStateStore is in this state or not.


(default) Initial state after the HDFSBackedStateStore was created

Allows for state changes (e.g. put, remove, getRange) and eventually committing or aborting them

writeUpdateToDeltaFile Internal Method

  output: DataOutputStream,
  key: UnsafeRow,
  value: UnsafeRow): Unit

put Method

  key: UnsafeRow,
  value: UnsafeRow): Unit
put is a part of StateStore Contract to…​FIXME

put stores the copies of the key and value in mapToUpdate internal registry followed by writing them to a delta file (using tempDeltaFileStream).

put reports an IllegalStateException when HDFSBackedStateStore is not in UPDATING state:

Cannot put after already committed or aborted

Committing State Changes — commit Method

commit(): Long
commit is part of the StateStore Contract to commit state changes.

commit requests the parent HDFSBackedStateStoreProvider to commit state changes (as a new version of state) (with the newVersion, the mapToUpdate and the compressed stream).

commit transitions HDFSBackedStateStore to COMMITTED state.

commit prints out the following INFO message to the logs:

Committed version [newVersion] for [this] to file [finalDeltaFile]

commit returns a newVersion.

commit throws an IllegalStateException when HDFSBackedStateStore is not in UPDATING state:

Cannot commit after already committed or aborted

commit throws an IllegalStateException for any NonFatal exception:

Error committing version [newVersion] into [this]

Aborting State Changes — abort Method

abort(): Unit
abort is part of the StateStore Contract to abort the state changes.


Performance Metrics — metrics Method

metrics: StateStoreMetrics
metrics is part of the StateStore Contract to get the StateStoreMetrics.

metrics requests the performance metrics of the parent HDFSBackedStateStoreProvider.

The performance metrics of the provider used are only the ones listed in supportedCustomMetrics.

In the end, metrics returns a new StateStoreMetrics with the following:

Are State Changes Committed? — hasCommitted Method

hasCommitted: Boolean
hasCommitted is part of the StateStore Contract to indicate whether state changes have been committed or not.

hasCommitted returns true when HDFSBackedStateStore is in COMMITTED state and false otherwise.

Internal Properties

Name Description


compressedStream: DataOutputStream

The compressed java.io.DataOutputStream for the deltaFileStream


deltaFileStream: CheckpointFileManager.CancellableFSDataOutputStream


finalDeltaFile: Path

The Hadoop Path of the deltaFile for the version


newVersion: Long

Used exclusively when HDFSBackedStateStore is requested for the finalDeltaFile, to commit and abort

results matching ""

    No results matching ""