HDFSBackedStateStoreProvider — Default StateStoreProvider

HDFSBackedStateStoreProvider is the default StateStoreProvider per spark.sql.streaming.stateStore.providerClass internal configuration property.

When StateStoreProvider helper object is requested to create and initialize the StateStoreProvider, HDFSBackedStateStoreProvider is created and requested to initialize.

HDFSBackedStateStoreProvider uses the state checkpoint base directory (that is the storeCheckpointLocation of the StateStoreId) for delta and snapshot state files. The checkpoint directory is created when HDFSBackedStateStoreProvider is requested to initialize.

The StateStoreId of a HDFSBackedStateStoreProvider is defined at initialization.

When requested for the textual representation, HDFSBackedStateStoreProvider returns HDFSStateStoreProvider[id = (op=[operatorId],part=[partitionId]),dir = [baseDir]].

Table 1. HDFSBackedStateStoreProvider’s Internal Properties
Name Description

loadedMaps

loadedMaps: TreeMap[Long, ConcurrentHashMap[UnsafeRow, UnsafeRow]]

java.util.TreeMap of FIXME sorted according to the reversed natural ordering of the keys

The current size estimation of loadedMaps is the memoryUsedBytes metric in the metrics.

A new entry (a version and the associated map) is added when HDFSBackedStateStoreProvider is requested to putStateIntoStateCacheMap

Used when…​FIXME

numberOfVersionsToRetainInMemory

numberOfVersionsToRetainInMemory: Int

numberOfVersionsToRetainInMemory is the spark.sql.streaming.maxBatchesToRetainInMemory configuration property that sets the upper limit on the number of entries in the loadedMaps internal registry.

numberOfVersionsToRetainInMemory is used when HDFSBackedStateStoreProvider is requested to putStateIntoStateCacheMap.

Tip

Enable DEBUG, INFO or WARN logging level for org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider=DEBUG

Refer to Logging.

Retrieving State Store for Version — getStore Method

getStore(version: Long): StateStore
Note
getStore is part of the StateStoreProvider Contract to get the StateStore for a given version.

getStore…​FIXME

deltaFile Internal Method

deltaFile(version: Long): Path

deltaFile creates a Hadoop Path of the [version].delta file in the baseDir.

Note
deltaFile is used when…​FIXME

fetchFiles Internal Method

fetchFiles(): Seq[StoreFile]

fetchFiles…​FIXME

Note
fetchFiles is used when HDFSBackedStateStoreProvider is requested to latestIterator, doSnapshot and cleanup.

Initializing StateStoreProvider — init Method

init(
  stateStoreId: StateStoreId,
  keySchema: StructType,
  valueSchema: StructType,
  indexOrdinal: Option[Int],
  storeConf: StateStoreConf,
  hadoopConf: Configuration): Unit
Note
init is part of the StateStoreProvider Contract to initialize itself.

init assigns the values of the input arguments to stateStoreId, keySchema, valueSchema, storeConf, and hadoopConf.

init uses the StateStoreConf to requests for the spark.sql.streaming.maxBatchesToRetainInMemory configuration property (that is then the numberOfVersionsToRetainInMemory).

In the end, init requests the CheckpointFileManager to create the baseDir directory (with subdirectories).

latestIterator Internal Method

latestIterator(): Iterator[UnsafeRowPair]

latestIterator…​FIXME

Note
latestIterator seems to be used exclusively in tests.

doSnapshot Internal Method

doSnapshot(): Unit

doSnapshot…​FIXME

Note
doSnapshot is used when…​FIXME

cleanup Internal Method

cleanup(): Unit

cleanup…​FIXME

Note
cleanup is used when…​FIXME

Closing State Store Provider — close Method

close(): Unit
Note
close is part of the StateStoreProvider Contract to close the state store provider.

close…​FIXME

putStateIntoStateCacheMap Internal Method

putStateIntoStateCacheMap(
  newVersion: Long,
  map: ConcurrentHashMap[UnsafeRow, UnsafeRow]): Unit

putStateIntoStateCacheMap…​FIXME

Note
putStateIntoStateCacheMap is used when HDFSBackedStateStoreProvider is requested to commitUpdates and loadMap.

commitUpdates Internal Method

commitUpdates(
  newVersion: Long,
  map: ConcurrentHashMap[UnsafeRow, UnsafeRow],
  output: DataOutputStream): Unit

commitUpdates…​FIXME

Note
commitUpdates is used exclusively when HDFSBackedStateStore is requested to commit state changes.

loadMap Internal Method

loadMap(version: Long): ConcurrentHashMap[UnsafeRow, UnsafeRow]

loadMap…​FIXME

Note
loadMap is used when HDFSBackedStateStoreProvider is requested to getStore and latestIterator.

results matching ""

    No results matching ""