HDFSBackedStateStoreProvider — Hadoop DFS-based StateStoreProvider

HDFSBackedStateStoreProvider is a StateStoreProvider that uses a Hadoop DFS-compatible file system for versioned state checkpointing.

HDFSBackedStateStoreProvider is the default StateStoreProvider per the spark.sql.streaming.stateStore.providerClass internal configuration property.

HDFSBackedStateStoreProvider is created and immediately requested to initialize when StateStoreProvider utility is requested to create and initialize a StateStoreProvider. That is when HDFSBackedStateStoreProvider is given the StateStoreId that uniquely identifies the state store to use for a stateful operator and a partition.

HDFSStateStoreProvider uses HDFSBackedStateStores to manage state (one per version).

HDFSBackedStateStoreProvider manages versioned state in delta and snapshot files (and uses a cache internally for faster access to state versions).

HDFSBackedStateStoreProvider takes no arguments to be created.

Tip

Enable ALL logging level for org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider=ALL

Refer to Logging.

Performance Metrics

Name (in web UI) Description

memoryUsedBytes

Estimated size of the loadedMaps internal registry

count of cache hit on states cache in provider

The number of times loading the specified version of state was successful and found (hit) the requested state version in the loadedMaps internal cache

count of cache miss on states cache in provider

The number of times loading the specified version of state could not find (missed) the requested state version in the loadedMaps internal cache

estimated size of state only on current version

Estimated size of the current state (of the HDFSBackedStateStore)

State Checkpoint Base Directory — baseDir Lazy Internal Property

baseDir: Path

baseDir is the base directory (as Hadoop DFS’s Path) for state checkpointing (for delta and snapshot state files).

baseDir is initialized lazily since it is not yet known when HDFSBackedStateStoreProvider is created.

baseDir is initialized and created based on the state checkpoint base directory of the StateStoreId when HDFSBackedStateStoreProvider is requested to initialize.

StateStoreId — Unique Identifier of State Store

As a StateStoreProvider, HDFSBackedStateStoreProvider is associated with a StateStoreId (which is a unique identifier of the state store for a stateful operator and a partition).

HDFSBackedStateStoreProvider is given the StateStoreId at initialization (as requested by the StateStoreProvider contract).

The StateStoreId is then used for the following:

Textual Representation — toString Method

toString: String
Note
toString is part of the java.lang.Object contract for the string representation of the object.

HDFSBackedStateStoreProvider uses the StateStoreId and the state checkpoint base directory for the textual representation:

HDFSStateStoreProvider[id = (op=[operatorId],part=[partitionId]),dir = [baseDir]]

Loading Specified Version of State (Store) For Update — getStore Method

getStore(
  version: Long): StateStore
Note
getStore is part of the StateStoreProvider Contract for the StateStore for a specified version.

getStore creates a new empty state (ConcurrentHashMap[UnsafeRow, UnsafeRow]) and loads the specified version of state (from internal cache or snapshot and delta files) for versions greater than 0.

In the end, getStore creates a new HDFSBackedStateStore for the specified version with the new state and prints out the following INFO message to the logs:

Retrieved version [version] of [this] for update

getStore throws an IllegalArgumentException when the specified version is less than 0 (negative):

Version cannot be less than 0

deltaFile Internal Method

deltaFile(version: Long): Path

deltaFile simply returns the Hadoop Path of the [version].delta file in the state checkpoint base directory.

Note

deltaFile is used when:

snapshotFile Internal Method

snapshotFile(version: Long): Path

snapshotFile simply returns the Hadoop Path of the [version].snapshot file in the state checkpoint base directory.

Note
snapshotFile is used when HDFSBackedStateStoreProvider is requested to writeSnapshotFile or readSnapshotFile.

Listing All Delta And Snapshot Files In State Checkpoint Directory — fetchFiles Internal Method

fetchFiles(): Seq[StoreFile]

fetchFiles requests the CheckpointFileManager for all the files in the state checkpoint directory.

For every file, fetchFiles splits the name into two parts with . (dot) as a separator (files with more or less than two parts are simply ignored) and registers a new StoreFile for snapshot and delta files:

  • For snapshot files, fetchFiles creates a new StoreFile with isSnapshot flag on (true)

  • For delta files, fetchFiles creates a new StoreFile with isSnapshot flag off (false)

Note
delta files are only registered if there was no snapshot file for the version.

fetchFiles prints out the following WARN message to the logs for any other files:

Could not identify file [path] for [this]

In the end, fetchFiles sorts the StoreFiles based on their version, prints out the following DEBUG message to the logs, and returns the files.

Current set of files for [this]: [storeFiles]
Note
fetchFiles is used when HDFSBackedStateStoreProvider is requested to doSnapshot and cleanup.

Initializing StateStoreProvider — init Method

init(
  stateStoreId: StateStoreId,
  keySchema: StructType,
  valueSchema: StructType,
  indexOrdinal: Option[Int],
  storeConf: StateStoreConf,
  hadoopConf: Configuration): Unit
Note
init is part of the StateStoreProvider Contract to initialize itself.

init records the values of the input arguments as the stateStoreId, keySchema, valueSchema, storeConf, and hadoopConf internal properties.

init requests the given StateStoreConf for the spark.sql.streaming.maxBatchesToRetainInMemory configuration property (that is then recorded in the numberOfVersionsToRetainInMemory internal property).

In the end, init requests the CheckpointFileManager to create the baseDir directory (with parent directories).

Finding Snapshot File and Delta Files For Version — filesForVersion Internal Method

filesForVersion(
  allFiles: Seq[StoreFile],
  version: Long): Seq[StoreFile]

filesForVersion finds the latest snapshot version among the given allFiles files up to and including the given version (it may or may not be available).

If a snapshot file was found (among the given file up to and including the given version), filesForVersion takes all delta files between the version of the snapshot file (exclusive) and the given version (inclusive) from the given allFiles files.

Note
The number of delta files should be the given version minus the snapshot version.

If a snapshot file was not found, filesForVersion takes all delta files up to the given version (inclusive) from the given allFiles files.

In the end, filesForVersion returns a snapshot version (if available) and all delta files up to the given version (inclusive).

Note
filesForVersion is used when HDFSBackedStateStoreProvider is requested to doSnapshot and cleanup.

State Maintenance (Snapshotting and Cleaning Up) — doMaintenance Method

doMaintenance(): Unit
Note
doMaintenance is part of the StateStoreProvider Contract for optional state maintenance.

doMaintenance simply does state snapshoting followed by cleaning up (removing old state files).

In case of any non-fatal errors, doMaintenance simply prints out the following WARN message to the logs:

Error performing snapshot and cleaning up [this]

State Snapshoting (Rolling Up Delta Files into Snapshot File) — doSnapshot Internal Method

doSnapshot(): Unit

doSnapshot lists all delta and snapshot files in the state checkpoint directory (files) and prints out the following DEBUG message to the logs:

fetchFiles() took [time] ms.

doSnapshot returns immediately (and does nothing) when there are no delta and snapshot files.

doSnapshot takes the version of the latest file (lastVersion).

doSnapshot finds the snapshot file and delta files for the version (among the files and for the last version).

doSnapshot looks up the last version in the internal state cache.

When the last version was found in the cache and the number of delta files is above spark.sql.streaming.stateStore.minDeltasForSnapshot internal threshold, doSnapshot writes a compressed snapshot file for the last version.

In the end, doSnapshot prints out the following DEBUG message to the logs:

writeSnapshotFile() took [time] ms.

In case of non-fatal errors, doSnapshot simply prints out the following WARN message to the logs:

Error doing snapshots for [this]
Note
doSnapshot is used exclusively when HDFSBackedStateStoreProvider is requested to do state maintenance (state snapshotting and cleaning up).

Cleaning Up (Removing Old State Files) — cleanup Internal Method

cleanup(): Unit

cleanup lists all delta and snapshot files in the state checkpoint directory (files) and prints out the following DEBUG message to the logs:

fetchFiles() took [time] ms.

cleanup returns immediately (and does nothing) when there are no delta and snapshot files.

cleanup takes the version of the latest state file (lastVersion) and decrements it by spark.sql.streaming.minBatchesToRetain configuration property (default: 100) that gives the earliest version to retain (and all older state files to be removed).

cleanup requests the CheckpointFileManager to delete the path of every old state file.

cleanup prints out the following DEBUG message to the logs:

deleting files took [time] ms.

In the end, cleanup prints out the following INFO message to the logs:

Deleted files older than [version] for [this]: [filesToDelete]

In case of a non-fatal exception, cleanup prints out the following WARN message to the logs:

Error cleaning up files for [this]
Note
cleanup is used exclusively when HDFSBackedStateStoreProvider is requested for state maintenance (state snapshotting and cleaning up).

Closing State Store Provider — close Method

close(): Unit
Note
close is part of the StateStoreProvider Contract to close the state store provider.

close…​FIXME

getMetricsForProvider Method

getMetricsForProvider(): Map[String, Long]

getMetricsForProvider returns the following performance metrics:

Note
getMetricsForProvider is used exclusively when HDFSBackedStateStore is requested for performance metrics.

Supported StateStoreCustomMetrics — supportedCustomMetrics Method

supportedCustomMetrics: Seq[StateStoreCustomMetric]
Note
supportedCustomMetrics is part of the StateStoreProvider Contract for the StateStoreCustomMetrics of a state store provider.

supportedCustomMetrics includes the following StateStoreCustomMetrics:

Committing State Changes (As New Version of State) — commitUpdates Internal Method

commitUpdates(
  newVersion: Long,
  map: ConcurrentHashMap[UnsafeRow, UnsafeRow],
  output: DataOutputStream): Unit

commitUpdates finalizeDeltaFile (with the given DataOutputStream) followed by caching the new version of state (with the given newVersion and the map state).

Note
commitUpdates is used exclusively when HDFSBackedStateStore is requested to commit state changes.

Loading Specified Version of State (from Internal Cache or Snapshot and Delta Files) — loadMap Internal Method

loadMap(
  version: Long): ConcurrentHashMap[UnsafeRow, UnsafeRow]

loadMap firstly tries to find the state version in the loadedMaps internal cache and, if found, returns it immediately and increments the loadedMapCacheHitCount metric.

If the requested state version could not be found in the loadedMaps internal cache, loadMap prints out the following WARN message to the logs:

The state for version [version] doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.

loadMap increments the loadedMapCacheMissCount metric.

If not found, loadMap tries to find the most recent state version by decrementing the requested version until one is found in the loadedMaps internal cache or loaded from a state snapshot (file).

loadMap updateFromDeltaFile for all the remaining versions (from the snapshot version up to the requested one). loadMap puts the final version of state in the internal cache (the closest snapshot and the remaining delta versions) and returns it.

In the end, loadMap prints out the following DEBUG message to the logs:

Loading state for [version] takes [elapsedMs] ms.
Note
loadMap is used exclusively when HDFSBackedStateStoreProvider is requested for the specified version of a state store for update.

Loading State Snapshot File For Specified Version — readSnapshotFile Internal Method

readSnapshotFile(
  version: Long): Option[ConcurrentHashMap[UnsafeRow, UnsafeRow]]

readSnapshotFile creates the path of the snapshot file for the given version.

readSnapshotFile reads the decompressed input stream until an EOF (that is marked as the integer -1 in the stream) and inserts key and value rows in a state map (ConcurrentHashMap[UnsafeRow, UnsafeRow]):

  • First integer is the size of a key (buffer) followed by the key itself (of the size). readSnapshotFile creates an UnsafeRow for the key (with the number of fields as indicated by the number of fields of the key schema).

  • Next integer is the size of a value (buffer) followed by the value itself (of the size). readSnapshotFile creates an UnsafeRow for the value (with the number of fields as indicated by the number of fields of the value schema).

In the end, readSnapshotFile prints out the following INFO message to the logs and returns the key-value map.

Read snapshot file for version [version] of [this] from [fileToRead]

In case of FileNotFoundException readSnapshotFile simply returns None (to indicate no snapshot state file was available and so no state for the version).

readSnapshotFile throws an IOException for the size of a key or a value below 0:

Error reading snapshot file [fileToRead] of [this]: [key|value] size cannot be [keySize|valueSize]
Note
readSnapshotFile is used exclusively when HDFSBackedStateStoreProvider is requested to load the specified version of state (from the internal cache or snapshot and delta files).

Updating State with State Changes For Specified Version (per Delta File) — updateFromDeltaFile Internal Method

updateFromDeltaFile(
  version: Long,
  map: ConcurrentHashMap[UnsafeRow, UnsafeRow]): Unit
Note

updateFromDeltaFile is very similar code-wise to readSnapshotFile with the two main differences:

  • updateFromDeltaFile is given the state map to update (while readSnapshotFile loads the state from a snapshot file)

  • updateFromDeltaFile removes a key from the state map when the value (size) is -1 (while readSnapshotFile throws an IOException)

The following description is almost an exact copy of readSnapshotFile just for completeness.

updateFromDeltaFile creates the path of the delta file for the requested version.

updateFromDeltaFile reads the decompressed input stream until an EOF (that is marked as the integer -1 in the stream) and inserts key and value rows in the given state map:

  • First integer is the size of a key (buffer) followed by the key itself (of the size). updateFromDeltaFile creates an UnsafeRow for the key (with the number of fields as indicated by the number of fields of the key schema).

  • Next integer is the size of a value (buffer) followed by the value itself (of the size). updateFromDeltaFile creates an UnsafeRow for the value (with the number of fields as indicated by the number of fields of the value schema) or removes the corresponding key from the state map (if the value size is -1)

Note
updateFromDeltaFile removes the key-value entry from the state map if the value (size) is -1.

In the end, updateFromDeltaFile prints out the following INFO message to the logs and returns the key-value map.

Read delta file for version [version] of [this] from [fileToRead]

updateFromDeltaFile throws an IllegalStateException in case of FileNotFoundException while opening the delta file for the specified version:

Error reading delta file [fileToRead] of [this]: [fileToRead] does not exist
Note
updateFromDeltaFile is used exclusively when HDFSBackedStateStoreProvider is requested to load the specified version of state (from the internal cache or snapshot and delta files).

Caching New Version of State — putStateIntoStateCacheMap Internal Method

putStateIntoStateCacheMap(
  newVersion: Long,
  map: ConcurrentHashMap[UnsafeRow, UnsafeRow]): Unit

putStateIntoStateCacheMap registers state for a given version, i.e. adds the map state under the newVersion key in the loadedMaps internal registry.

With the numberOfVersionsToRetainInMemory threshold as 0 or below, putStateIntoStateCacheMap simply removes all entries from the loadedMaps internal registry and returns.

putStateIntoStateCacheMap removes the oldest state version(s) in the loadedMaps internal registry until its size is at the numberOfVersionsToRetainInMemory threshold.

With the size of the loadedMaps internal registry is at the numberOfVersionsToRetainInMemory threshold, putStateIntoStateCacheMap does two more optimizations per newVersion

  • It does not add the given state when the version of the oldest state is earlier (larger) than the given newVersion

  • It removes the oldest state when older (smaller) than the given newVersion

Note
putStateIntoStateCacheMap is used when HDFSBackedStateStoreProvider is requested to commit state (as a new version) and load the specified version of state (from the internal cache or snapshot and delta files).

Writing Compressed Snapshot File for Specified Version — writeSnapshotFile Internal Method

writeSnapshotFile(
  version: Long,
  map: ConcurrentHashMap[UnsafeRow, UnsafeRow]): Unit

writeSnapshotFile snapshotFile for the given version.

writeSnapshotFile requests the CheckpointFileManager to create the snapshot file (with overwriting enabled) and compress the stream.

For every key-value UnsafeRow pair in the given map, writeSnapshotFile writes the size of the key followed by the key itself (as bytes). writeSnapshotFile then writes the size of the value followed by the value itself (as bytes).

In the end, writeSnapshotFile prints out the following INFO message to the logs:

Written snapshot file for version [version] of [this] at [targetFile]

In case of any Throwable exception, writeSnapshotFile cancelDeltaFile and re-throws the exception.

Note
writeSnapshotFile is used exclusively when HDFSBackedStateStoreProvider is requested to doSnapshot.

compressStream Internal Method

compressStream(
  outputStream: DataOutputStream): DataOutputStream

compressStream creates a new LZ4CompressionCodec (based on the SparkConf) and requests it to create a LZ4BlockOutputStream with the given DataOutputStream.

In the end, compressStream creates a new DataOutputStream with the LZ4BlockOutputStream.

Note
compressStream is used when…​FIXME

cancelDeltaFile Internal Method

cancelDeltaFile(
  compressedStream: DataOutputStream,
  rawStream: CancellableFSDataOutputStream): Unit

cancelDeltaFile…​FIXME

Note
cancelDeltaFile is used when…​FIXME

finalizeDeltaFile Internal Method

finalizeDeltaFile(
  output: DataOutputStream): Unit

finalizeDeltaFile simply writes -1 to the given DataOutputStream (to indicate end of file) and closes it.

Note
finalizeDeltaFile is used exclusively when HDFSBackedStateStoreProvider is requested to commit state changes (a new version of state).

Lookup Table (Cache) of States By Version — loadedMaps Internal Method

loadedMaps: TreeMap[
  Long,                                    // version
  ConcurrentHashMap[UnsafeRow, UnsafeRow]] // state (as keys and values)

loadedMaps is a java.util.TreeMap of state versions sorted according to the reversed ordering of the versions (i.e. long numbers).

A new entry (a version and the state updates) can only be added when HDFSBackedStateStoreProvider is requested to putStateIntoStateCacheMap (and only when the spark.sql.streaming.maxBatchesToRetainInMemory internal configuration is above 0).

loadedMaps is mainly used when HDFSBackedStateStoreProvider is requested to load the specified version of state (from the internal cache or snapshot and delta files). Positive hits (when a version could be found in the cache) is available as the count of cache hit on states cache in provider performance metric while misses are counted in the count of cache miss on states cache in provider performance metric.

Note
With no or missing versions in cache count of cache miss on states cache in provider metric should be above 0 while count of cache hit on states cache in provider always 0 (or smaller than the other metric).

The estimated size of loadedMaps is available as the memoryUsedBytes performance metric.

The spark.sql.streaming.maxBatchesToRetainInMemory internal configuration is used as the threshold of the number of elements in loadedMaps. When 0 or negative, every putStateIntoStateCacheMap removes all elements in (clears) loadedMaps.

Note
It is possible to change the configuration at restart of a structured query.

The state deltas (the values) in loadedMaps are cleared (all entries removed) when HDFSBackedStateStoreProvider is requested to close.

Used when HDFSBackedStateStoreProvider is requested for the following:

Internal Properties

Name Description

fm

hadoopConf

Given when HDFSBackedStateStoreProvider is requested to initialize

keySchema

keySchema: StructType

Schema of the state keys

valueSchema

valueSchema: StructType

Schema of the state values

numberOfVersionsToRetainInMemory

numberOfVersionsToRetainInMemory: Int

numberOfVersionsToRetainInMemory is the maximum number of entries in the loadedMaps internal registry and is configured by the spark.sql.streaming.maxBatchesToRetainInMemory internal configuration.

numberOfVersionsToRetainInMemory is a threshold when HDFSBackedStateStoreProvider removes the last key from the loadedMaps internal registry (per reverse ordering of state versions) when requested to putStateIntoStateCacheMap.

sparkConf

SparkConf

results matching ""

    No results matching ""