log4j.logger.org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider=ALL
HDFSBackedStateStoreProvider — Hadoop DFS-based StateStoreProvider
HDFSBackedStateStoreProvider is a StateStoreProvider that uses a Hadoop DFS-compatible file system for versioned state checkpointing.
HDFSBackedStateStoreProvider is the default StateStoreProvider per the spark.sql.streaming.stateStore.providerClass internal configuration property.
HDFSBackedStateStoreProvider is created and immediately requested to initialize when StateStoreProvider utility is requested to create and initialize a StateStoreProvider. That is when HDFSBackedStateStoreProvider is given the StateStoreId that uniquely identifies the state store to use for a stateful operator and a partition.
HDFSStateStoreProvider uses HDFSBackedStateStores to manage state (one per version).
HDFSBackedStateStoreProvider manages versioned state in delta and snapshot files (and uses a cache internally for faster access to state versions).
HDFSBackedStateStoreProvider takes no arguments to be created.
|
Tip
|
Enable Add the following line to Refer to Logging. |
Performance Metrics
| Name (in web UI) | Description |
|---|---|
memoryUsedBytes |
Estimated size of the loadedMaps internal registry |
count of cache hit on states cache in provider |
The number of times loading the specified version of state was successful and found (hit) the requested state version in the loadedMaps internal cache |
count of cache miss on states cache in provider |
The number of times loading the specified version of state could not find (missed) the requested state version in the loadedMaps internal cache |
estimated size of state only on current version |
Estimated size of the current state (of the HDFSBackedStateStore) |
State Checkpoint Base Directory — baseDir Lazy Internal Property
baseDir: Path
baseDir is the base directory (as Hadoop DFS’s Path) for state checkpointing (for delta and snapshot state files).
baseDir is initialized lazily since it is not yet known when HDFSBackedStateStoreProvider is created.
baseDir is initialized and created based on the state checkpoint base directory of the StateStoreId when HDFSBackedStateStoreProvider is requested to initialize.
StateStoreId — Unique Identifier of State Store
As a StateStoreProvider, HDFSBackedStateStoreProvider is associated with a StateStoreId (which is a unique identifier of the state store for a stateful operator and a partition).
HDFSBackedStateStoreProvider is given the StateStoreId at initialization (as requested by the StateStoreProvider contract).
The StateStoreId is then used for the following:
-
HDFSBackedStateStoreis requested for the id -
HDFSBackedStateStoreProvideris requested for the textual representation and the state checkpoint base directory
Textual Representation — toString Method
toString: String
|
Note
|
toString is part of the java.lang.Object contract for the string representation of the object.
|
HDFSBackedStateStoreProvider uses the StateStoreId and the state checkpoint base directory for the textual representation:
HDFSStateStoreProvider[id = (op=[operatorId],part=[partitionId]),dir = [baseDir]]
Loading Specified Version of State (Store) For Update — getStore Method
getStore(
version: Long): StateStore
|
Note
|
getStore is part of the StateStoreProvider Contract for the StateStore for a specified version.
|
getStore creates a new empty state (ConcurrentHashMap[UnsafeRow, UnsafeRow]) and loads the specified version of state (from internal cache or snapshot and delta files) for versions greater than 0.
In the end, getStore creates a new HDFSBackedStateStore for the specified version with the new state and prints out the following INFO message to the logs:
Retrieved version [version] of [this] for update
getStore throws an IllegalArgumentException when the specified version is less than 0 (negative):
Version cannot be less than 0
deltaFile Internal Method
deltaFile(version: Long): Path
deltaFile simply returns the Hadoop Path of the [version].delta file in the state checkpoint base directory.
|
Note
|
|
snapshotFile Internal Method
snapshotFile(version: Long): Path
snapshotFile simply returns the Hadoop Path of the [version].snapshot file in the state checkpoint base directory.
|
Note
|
snapshotFile is used when HDFSBackedStateStoreProvider is requested to writeSnapshotFile or readSnapshotFile.
|
Listing All Delta And Snapshot Files In State Checkpoint Directory — fetchFiles Internal Method
fetchFiles(): Seq[StoreFile]
fetchFiles requests the CheckpointFileManager for all the files in the state checkpoint directory.
For every file, fetchFiles splits the name into two parts with . (dot) as a separator (files with more or less than two parts are simply ignored) and registers a new StoreFile for snapshot and delta files:
-
For
snapshotfiles,fetchFilescreates a newStoreFilewithisSnapshotflag on (true) -
For
deltafiles,fetchFilescreates a newStoreFilewithisSnapshotflag off (false)
|
Note
|
delta files are only registered if there was no snapshot file for the version.
|
fetchFiles prints out the following WARN message to the logs for any other files:
Could not identify file [path] for [this]
In the end, fetchFiles sorts the StoreFiles based on their version, prints out the following DEBUG message to the logs, and returns the files.
Current set of files for [this]: [storeFiles]
|
Note
|
fetchFiles is used when HDFSBackedStateStoreProvider is requested to doSnapshot and cleanup.
|
Initializing StateStoreProvider — init Method
init(
stateStoreId: StateStoreId,
keySchema: StructType,
valueSchema: StructType,
indexOrdinal: Option[Int],
storeConf: StateStoreConf,
hadoopConf: Configuration): Unit
|
Note
|
init is part of the StateStoreProvider Contract to initialize itself.
|
init records the values of the input arguments as the stateStoreId, keySchema, valueSchema, storeConf, and hadoopConf internal properties.
init requests the given StateStoreConf for the spark.sql.streaming.maxBatchesToRetainInMemory configuration property (that is then recorded in the numberOfVersionsToRetainInMemory internal property).
In the end, init requests the CheckpointFileManager to create the baseDir directory (with parent directories).
Finding Snapshot File and Delta Files For Version — filesForVersion Internal Method
filesForVersion(
allFiles: Seq[StoreFile],
version: Long): Seq[StoreFile]
filesForVersion finds the latest snapshot version among the given allFiles files up to and including the given version (it may or may not be available).
If a snapshot file was found (among the given file up to and including the given version), filesForVersion takes all delta files between the version of the snapshot file (exclusive) and the given version (inclusive) from the given allFiles files.
|
Note
|
The number of delta files should be the given version minus the snapshot version. |
If a snapshot file was not found, filesForVersion takes all delta files up to the given version (inclusive) from the given allFiles files.
In the end, filesForVersion returns a snapshot version (if available) and all delta files up to the given version (inclusive).
|
Note
|
filesForVersion is used when HDFSBackedStateStoreProvider is requested to doSnapshot and cleanup.
|
State Maintenance (Snapshotting and Cleaning Up) — doMaintenance Method
doMaintenance(): Unit
|
Note
|
doMaintenance is part of the StateStoreProvider Contract for optional state maintenance.
|
doMaintenance simply does state snapshoting followed by cleaning up (removing old state files).
In case of any non-fatal errors, doMaintenance simply prints out the following WARN message to the logs:
Error performing snapshot and cleaning up [this]
State Snapshoting (Rolling Up Delta Files into Snapshot File) — doSnapshot Internal Method
doSnapshot(): Unit
doSnapshot lists all delta and snapshot files in the state checkpoint directory (files) and prints out the following DEBUG message to the logs:
fetchFiles() took [time] ms.
doSnapshot returns immediately (and does nothing) when there are no delta and snapshot files.
doSnapshot takes the version of the latest file (lastVersion).
doSnapshot finds the snapshot file and delta files for the version (among the files and for the last version).
doSnapshot looks up the last version in the internal state cache.
When the last version was found in the cache and the number of delta files is above spark.sql.streaming.stateStore.minDeltasForSnapshot internal threshold, doSnapshot writes a compressed snapshot file for the last version.
In the end, doSnapshot prints out the following DEBUG message to the logs:
writeSnapshotFile() took [time] ms.
In case of non-fatal errors, doSnapshot simply prints out the following WARN message to the logs:
Error doing snapshots for [this]
|
Note
|
doSnapshot is used exclusively when HDFSBackedStateStoreProvider is requested to do state maintenance (state snapshotting and cleaning up).
|
Cleaning Up (Removing Old State Files) — cleanup Internal Method
cleanup(): Unit
cleanup lists all delta and snapshot files in the state checkpoint directory (files) and prints out the following DEBUG message to the logs:
fetchFiles() took [time] ms.
cleanup returns immediately (and does nothing) when there are no delta and snapshot files.
cleanup takes the version of the latest state file (lastVersion) and decrements it by spark.sql.streaming.minBatchesToRetain configuration property (default: 100) that gives the earliest version to retain (and all older state files to be removed).
cleanup requests the CheckpointFileManager to delete the path of every old state file.
cleanup prints out the following DEBUG message to the logs:
deleting files took [time] ms.
In the end, cleanup prints out the following INFO message to the logs:
Deleted files older than [version] for [this]: [filesToDelete]
In case of a non-fatal exception, cleanup prints out the following WARN message to the logs:
Error cleaning up files for [this]
|
Note
|
cleanup is used exclusively when HDFSBackedStateStoreProvider is requested for state maintenance (state snapshotting and cleaning up).
|
Closing State Store Provider — close Method
close(): Unit
|
Note
|
close is part of the StateStoreProvider Contract to close the state store provider.
|
close…FIXME
getMetricsForProvider Method
getMetricsForProvider(): Map[String, Long]
getMetricsForProvider returns the following performance metrics:
|
Note
|
getMetricsForProvider is used exclusively when HDFSBackedStateStore is requested for performance metrics.
|
Supported StateStoreCustomMetrics — supportedCustomMetrics Method
supportedCustomMetrics: Seq[StateStoreCustomMetric]
|
Note
|
supportedCustomMetrics is part of the StateStoreProvider Contract for the StateStoreCustomMetrics of a state store provider.
|
supportedCustomMetrics includes the following StateStoreCustomMetrics:
Committing State Changes (As New Version of State) — commitUpdates Internal Method
commitUpdates(
newVersion: Long,
map: ConcurrentHashMap[UnsafeRow, UnsafeRow],
output: DataOutputStream): Unit
commitUpdates finalizeDeltaFile (with the given DataOutputStream) followed by caching the new version of state (with the given newVersion and the map state).
|
Note
|
commitUpdates is used exclusively when HDFSBackedStateStore is requested to commit state changes.
|
Loading Specified Version of State (from Internal Cache or Snapshot and Delta Files) — loadMap Internal Method
loadMap(
version: Long): ConcurrentHashMap[UnsafeRow, UnsafeRow]
loadMap firstly tries to find the state version in the loadedMaps internal cache and, if found, returns it immediately and increments the loadedMapCacheHitCount metric.
If the requested state version could not be found in the loadedMaps internal cache, loadMap prints out the following WARN message to the logs:
The state for version [version] doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
loadMap increments the loadedMapCacheMissCount metric.
loadMap tries to load the state snapshot file for the version and, if found, puts the version of state in the internal cache and returns it.
If not found, loadMap tries to find the most recent state version by decrementing the requested version until one is found in the loadedMaps internal cache or loaded from a state snapshot (file).
loadMap updateFromDeltaFile for all the remaining versions (from the snapshot version up to the requested one). loadMap puts the final version of state in the internal cache (the closest snapshot and the remaining delta versions) and returns it.
In the end, loadMap prints out the following DEBUG message to the logs:
Loading state for [version] takes [elapsedMs] ms.
|
Note
|
loadMap is used exclusively when HDFSBackedStateStoreProvider is requested for the specified version of a state store for update.
|
Loading State Snapshot File For Specified Version — readSnapshotFile Internal Method
readSnapshotFile(
version: Long): Option[ConcurrentHashMap[UnsafeRow, UnsafeRow]]
readSnapshotFile creates the path of the snapshot file for the given version.
readSnapshotFile requests the CheckpointFileManager to open the snapshot file for reading and creates a decompressed DataInputStream (input).
readSnapshotFile reads the decompressed input stream until an EOF (that is marked as the integer -1 in the stream) and inserts key and value rows in a state map (ConcurrentHashMap[UnsafeRow, UnsafeRow]):
-
First integer is the size of a key (buffer) followed by the key itself (of the size).
readSnapshotFilecreates anUnsafeRowfor the key (with the number of fields as indicated by the number of fields of the key schema). -
Next integer is the size of a value (buffer) followed by the value itself (of the size).
readSnapshotFilecreates anUnsafeRowfor the value (with the number of fields as indicated by the number of fields of the value schema).
In the end, readSnapshotFile prints out the following INFO message to the logs and returns the key-value map.
Read snapshot file for version [version] of [this] from [fileToRead]
In case of FileNotFoundException readSnapshotFile simply returns None (to indicate no snapshot state file was available and so no state for the version).
readSnapshotFile throws an IOException for the size of a key or a value below 0:
Error reading snapshot file [fileToRead] of [this]: [key|value] size cannot be [keySize|valueSize]
|
Note
|
readSnapshotFile is used exclusively when HDFSBackedStateStoreProvider is requested to load the specified version of state (from the internal cache or snapshot and delta files).
|
Updating State with State Changes For Specified Version (per Delta File) — updateFromDeltaFile Internal Method
updateFromDeltaFile(
version: Long,
map: ConcurrentHashMap[UnsafeRow, UnsafeRow]): Unit
|
Note
|
The following description is almost an exact copy of readSnapshotFile just for completeness. |
updateFromDeltaFile creates the path of the delta file for the requested version.
updateFromDeltaFile requests the CheckpointFileManager to open the delta file for reading and creates a decompressed DataInputStream (input).
updateFromDeltaFile reads the decompressed input stream until an EOF (that is marked as the integer -1 in the stream) and inserts key and value rows in the given state map:
-
First integer is the size of a key (buffer) followed by the key itself (of the size).
updateFromDeltaFilecreates anUnsafeRowfor the key (with the number of fields as indicated by the number of fields of the key schema). -
Next integer is the size of a value (buffer) followed by the value itself (of the size).
updateFromDeltaFilecreates anUnsafeRowfor the value (with the number of fields as indicated by the number of fields of the value schema) or removes the corresponding key from the state map (if the value size is-1)
|
Note
|
updateFromDeltaFile removes the key-value entry from the state map if the value (size) is -1.
|
In the end, updateFromDeltaFile prints out the following INFO message to the logs and returns the key-value map.
Read delta file for version [version] of [this] from [fileToRead]
updateFromDeltaFile throws an IllegalStateException in case of FileNotFoundException while opening the delta file for the specified version:
Error reading delta file [fileToRead] of [this]: [fileToRead] does not exist
|
Note
|
updateFromDeltaFile is used exclusively when HDFSBackedStateStoreProvider is requested to load the specified version of state (from the internal cache or snapshot and delta files).
|
Caching New Version of State — putStateIntoStateCacheMap Internal Method
putStateIntoStateCacheMap(
newVersion: Long,
map: ConcurrentHashMap[UnsafeRow, UnsafeRow]): Unit
putStateIntoStateCacheMap registers state for a given version, i.e. adds the map state under the newVersion key in the loadedMaps internal registry.
With the numberOfVersionsToRetainInMemory threshold as 0 or below, putStateIntoStateCacheMap simply removes all entries from the loadedMaps internal registry and returns.
putStateIntoStateCacheMap removes the oldest state version(s) in the loadedMaps internal registry until its size is at the numberOfVersionsToRetainInMemory threshold.
With the size of the loadedMaps internal registry is at the numberOfVersionsToRetainInMemory threshold, putStateIntoStateCacheMap does two more optimizations per newVersion
-
It does not add the given state when the version of the oldest state is earlier (larger) than the given
newVersion -
It removes the oldest state when older (smaller) than the given
newVersion
|
Note
|
putStateIntoStateCacheMap is used when HDFSBackedStateStoreProvider is requested to commit state (as a new version) and load the specified version of state (from the internal cache or snapshot and delta files).
|
Writing Compressed Snapshot File for Specified Version — writeSnapshotFile Internal Method
writeSnapshotFile(
version: Long,
map: ConcurrentHashMap[UnsafeRow, UnsafeRow]): Unit
writeSnapshotFile snapshotFile for the given version.
writeSnapshotFile requests the CheckpointFileManager to create the snapshot file (with overwriting enabled) and compress the stream.
For every key-value UnsafeRow pair in the given map, writeSnapshotFile writes the size of the key followed by the key itself (as bytes). writeSnapshotFile then writes the size of the value followed by the value itself (as bytes).
In the end, writeSnapshotFile prints out the following INFO message to the logs:
Written snapshot file for version [version] of [this] at [targetFile]
In case of any Throwable exception, writeSnapshotFile cancelDeltaFile and re-throws the exception.
|
Note
|
writeSnapshotFile is used exclusively when HDFSBackedStateStoreProvider is requested to doSnapshot.
|
compressStream Internal Method
compressStream(
outputStream: DataOutputStream): DataOutputStream
compressStream creates a new LZ4CompressionCodec (based on the SparkConf) and requests it to create a LZ4BlockOutputStream with the given DataOutputStream.
In the end, compressStream creates a new DataOutputStream with the LZ4BlockOutputStream.
|
Note
|
compressStream is used when…FIXME
|
cancelDeltaFile Internal Method
cancelDeltaFile(
compressedStream: DataOutputStream,
rawStream: CancellableFSDataOutputStream): Unit
cancelDeltaFile…FIXME
|
Note
|
cancelDeltaFile is used when…FIXME
|
finalizeDeltaFile Internal Method
finalizeDeltaFile(
output: DataOutputStream): Unit
finalizeDeltaFile simply writes -1 to the given DataOutputStream (to indicate end of file) and closes it.
|
Note
|
finalizeDeltaFile is used exclusively when HDFSBackedStateStoreProvider is requested to commit state changes (a new version of state).
|
Lookup Table (Cache) of States By Version — loadedMaps Internal Method
loadedMaps: TreeMap[
Long, // version
ConcurrentHashMap[UnsafeRow, UnsafeRow]] // state (as keys and values)
loadedMaps is a java.util.TreeMap of state versions sorted according to the reversed ordering of the versions (i.e. long numbers).
A new entry (a version and the state updates) can only be added when HDFSBackedStateStoreProvider is requested to putStateIntoStateCacheMap (and only when the spark.sql.streaming.maxBatchesToRetainInMemory internal configuration is above 0).
loadedMaps is mainly used when HDFSBackedStateStoreProvider is requested to load the specified version of state (from the internal cache or snapshot and delta files). Positive hits (when a version could be found in the cache) is available as the count of cache hit on states cache in provider performance metric while misses are counted in the count of cache miss on states cache in provider performance metric.
|
Note
|
With no or missing versions in cache count of cache miss on states cache in provider metric should be above 0 while count of cache hit on states cache in provider always 0 (or smaller than the other metric).
|
The estimated size of loadedMaps is available as the memoryUsedBytes performance metric.
The spark.sql.streaming.maxBatchesToRetainInMemory internal configuration is used as the threshold of the number of elements in loadedMaps. When 0 or negative, every putStateIntoStateCacheMap removes all elements in (clears) loadedMaps.
|
Note
|
It is possible to change the configuration at restart of a structured query. |
The state deltas (the values) in loadedMaps are cleared (all entries removed) when HDFSBackedStateStoreProvider is requested to close.
Used when HDFSBackedStateStoreProvider is requested for the following:
Internal Properties
| Name | Description |
|---|---|
|
CheckpointFileManager for the state checkpoint base directory (and the Hadoop Configuration) Used when:
|
|
Hadoop Configuration of the CheckpointFileManager Given when |
|
Schema of the state keys |
|
Schema of the state values |
|
|
|