log4j.logger.org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider=ALL
HDFSBackedStateStoreProvider — Hadoop DFS-based StateStoreProvider
HDFSBackedStateStoreProvider
is a StateStoreProvider that uses a Hadoop DFS-compatible file system for versioned state checkpointing.
HDFSBackedStateStoreProvider
is the default StateStoreProvider
per the spark.sql.streaming.stateStore.providerClass internal configuration property.
HDFSBackedStateStoreProvider
is created and immediately requested to initialize when StateStoreProvider
utility is requested to create and initialize a StateStoreProvider. That is when HDFSBackedStateStoreProvider
is given the StateStoreId that uniquely identifies the state store to use for a stateful operator and a partition.
HDFSStateStoreProvider
uses HDFSBackedStateStores to manage state (one per version).
HDFSBackedStateStoreProvider
manages versioned state in delta and snapshot files (and uses a cache internally for faster access to state versions).
HDFSBackedStateStoreProvider
takes no arguments to be created.
Tip
|
Enable Add the following line to Refer to Logging. |
Performance Metrics
Name (in web UI) | Description |
---|---|
memoryUsedBytes |
Estimated size of the loadedMaps internal registry |
count of cache hit on states cache in provider |
The number of times loading the specified version of state was successful and found (hit) the requested state version in the loadedMaps internal cache |
count of cache miss on states cache in provider |
The number of times loading the specified version of state could not find (missed) the requested state version in the loadedMaps internal cache |
estimated size of state only on current version |
Estimated size of the current state (of the HDFSBackedStateStore) |
State Checkpoint Base Directory — baseDir
Lazy Internal Property
baseDir: Path
baseDir
is the base directory (as Hadoop DFS’s Path) for state checkpointing (for delta and snapshot state files).
baseDir
is initialized lazily since it is not yet known when HDFSBackedStateStoreProvider
is created.
baseDir
is initialized and created based on the state checkpoint base directory of the StateStoreId when HDFSBackedStateStoreProvider
is requested to initialize.
StateStoreId — Unique Identifier of State Store
As a StateStoreProvider, HDFSBackedStateStoreProvider
is associated with a StateStoreId (which is a unique identifier of the state store for a stateful operator and a partition).
HDFSBackedStateStoreProvider
is given the StateStoreId at initialization (as requested by the StateStoreProvider contract).
The StateStoreId is then used for the following:
-
HDFSBackedStateStore
is requested for the id -
HDFSBackedStateStoreProvider
is requested for the textual representation and the state checkpoint base directory
Textual Representation — toString
Method
toString: String
Note
|
toString is part of the java.lang.Object contract for the string representation of the object.
|
HDFSBackedStateStoreProvider
uses the StateStoreId and the state checkpoint base directory for the textual representation:
HDFSStateStoreProvider[id = (op=[operatorId],part=[partitionId]),dir = [baseDir]]
Loading Specified Version of State (Store) For Update — getStore
Method
getStore(
version: Long): StateStore
Note
|
getStore is part of the StateStoreProvider Contract for the StateStore for a specified version.
|
getStore
creates a new empty state (ConcurrentHashMap[UnsafeRow, UnsafeRow]
) and loads the specified version of state (from internal cache or snapshot and delta files) for versions greater than 0
.
In the end, getStore
creates a new HDFSBackedStateStore for the specified version with the new state and prints out the following INFO message to the logs:
Retrieved version [version] of [this] for update
getStore
throws an IllegalArgumentException
when the specified version is less than 0
(negative):
Version cannot be less than 0
deltaFile
Internal Method
deltaFile(version: Long): Path
deltaFile
simply returns the Hadoop Path of the [version].delta
file in the state checkpoint base directory.
Note
|
|
snapshotFile
Internal Method
snapshotFile(version: Long): Path
snapshotFile
simply returns the Hadoop Path of the [version].snapshot
file in the state checkpoint base directory.
Note
|
snapshotFile is used when HDFSBackedStateStoreProvider is requested to writeSnapshotFile or readSnapshotFile.
|
Listing All Delta And Snapshot Files In State Checkpoint Directory — fetchFiles
Internal Method
fetchFiles(): Seq[StoreFile]
fetchFiles
requests the CheckpointFileManager for all the files in the state checkpoint directory.
For every file, fetchFiles
splits the name into two parts with .
(dot) as a separator (files with more or less than two parts are simply ignored) and registers a new StoreFile
for snapshot
and delta
files:
-
For
snapshot
files,fetchFiles
creates a newStoreFile
withisSnapshot
flag on (true
) -
For
delta
files,fetchFiles
creates a newStoreFile
withisSnapshot
flag off (false
)
Note
|
delta files are only registered if there was no snapshot file for the version.
|
fetchFiles
prints out the following WARN message to the logs for any other files:
Could not identify file [path] for [this]
In the end, fetchFiles
sorts the StoreFiles
based on their version, prints out the following DEBUG message to the logs, and returns the files.
Current set of files for [this]: [storeFiles]
Note
|
fetchFiles is used when HDFSBackedStateStoreProvider is requested to doSnapshot and cleanup.
|
Initializing StateStoreProvider — init
Method
init(
stateStoreId: StateStoreId,
keySchema: StructType,
valueSchema: StructType,
indexOrdinal: Option[Int],
storeConf: StateStoreConf,
hadoopConf: Configuration): Unit
Note
|
init is part of the StateStoreProvider Contract to initialize itself.
|
init
records the values of the input arguments as the stateStoreId, keySchema, valueSchema, storeConf, and hadoopConf internal properties.
init
requests the given StateStoreConf
for the spark.sql.streaming.maxBatchesToRetainInMemory configuration property (that is then recorded in the numberOfVersionsToRetainInMemory internal property).
In the end, init
requests the CheckpointFileManager to create the baseDir directory (with parent directories).
Finding Snapshot File and Delta Files For Version — filesForVersion
Internal Method
filesForVersion(
allFiles: Seq[StoreFile],
version: Long): Seq[StoreFile]
filesForVersion
finds the latest snapshot version among the given allFiles
files up to and including the given version (it may or may not be available).
If a snapshot file was found (among the given file up to and including the given version), filesForVersion
takes all delta files between the version of the snapshot file (exclusive) and the given version (inclusive) from the given allFiles
files.
Note
|
The number of delta files should be the given version minus the snapshot version. |
If a snapshot file was not found, filesForVersion
takes all delta files up to the given version (inclusive) from the given allFiles
files.
In the end, filesForVersion
returns a snapshot version (if available) and all delta files up to the given version (inclusive).
Note
|
filesForVersion is used when HDFSBackedStateStoreProvider is requested to doSnapshot and cleanup.
|
State Maintenance (Snapshotting and Cleaning Up) — doMaintenance
Method
doMaintenance(): Unit
Note
|
doMaintenance is part of the StateStoreProvider Contract for optional state maintenance.
|
doMaintenance
simply does state snapshoting followed by cleaning up (removing old state files).
In case of any non-fatal errors, doMaintenance
simply prints out the following WARN message to the logs:
Error performing snapshot and cleaning up [this]
State Snapshoting (Rolling Up Delta Files into Snapshot File) — doSnapshot
Internal Method
doSnapshot(): Unit
doSnapshot
lists all delta and snapshot files in the state checkpoint directory (files
) and prints out the following DEBUG message to the logs:
fetchFiles() took [time] ms.
doSnapshot
returns immediately (and does nothing) when there are no delta and snapshot files.
doSnapshot
takes the version of the latest file (lastVersion
).
doSnapshot
finds the snapshot file and delta files for the version (among the files and for the last version).
doSnapshot
looks up the last version in the internal state cache.
When the last version was found in the cache and the number of delta files is above spark.sql.streaming.stateStore.minDeltasForSnapshot internal threshold, doSnapshot
writes a compressed snapshot file for the last version.
In the end, doSnapshot
prints out the following DEBUG message to the logs:
writeSnapshotFile() took [time] ms.
In case of non-fatal errors, doSnapshot
simply prints out the following WARN message to the logs:
Error doing snapshots for [this]
Note
|
doSnapshot is used exclusively when HDFSBackedStateStoreProvider is requested to do state maintenance (state snapshotting and cleaning up).
|
Cleaning Up (Removing Old State Files) — cleanup
Internal Method
cleanup(): Unit
cleanup
lists all delta and snapshot files in the state checkpoint directory (files
) and prints out the following DEBUG message to the logs:
fetchFiles() took [time] ms.
cleanup
returns immediately (and does nothing) when there are no delta and snapshot files.
cleanup
takes the version of the latest state file (lastVersion
) and decrements it by spark.sql.streaming.minBatchesToRetain configuration property (default: 100
) that gives the earliest version to retain (and all older state files to be removed).
cleanup
requests the CheckpointFileManager to delete the path of every old state file.
cleanup
prints out the following DEBUG message to the logs:
deleting files took [time] ms.
In the end, cleanup
prints out the following INFO message to the logs:
Deleted files older than [version] for [this]: [filesToDelete]
In case of a non-fatal exception, cleanup
prints out the following WARN message to the logs:
Error cleaning up files for [this]
Note
|
cleanup is used exclusively when HDFSBackedStateStoreProvider is requested for state maintenance (state snapshotting and cleaning up).
|
Closing State Store Provider — close
Method
close(): Unit
Note
|
close is part of the StateStoreProvider Contract to close the state store provider.
|
close
…FIXME
getMetricsForProvider
Method
getMetricsForProvider(): Map[String, Long]
getMetricsForProvider
returns the following performance metrics:
Note
|
getMetricsForProvider is used exclusively when HDFSBackedStateStore is requested for performance metrics.
|
Supported StateStoreCustomMetrics — supportedCustomMetrics
Method
supportedCustomMetrics: Seq[StateStoreCustomMetric]
Note
|
supportedCustomMetrics is part of the StateStoreProvider Contract for the StateStoreCustomMetrics of a state store provider.
|
supportedCustomMetrics
includes the following StateStoreCustomMetrics:
Committing State Changes (As New Version of State) — commitUpdates
Internal Method
commitUpdates(
newVersion: Long,
map: ConcurrentHashMap[UnsafeRow, UnsafeRow],
output: DataOutputStream): Unit
commitUpdates
finalizeDeltaFile (with the given DataOutputStream
) followed by caching the new version of state (with the given newVersion
and the map
state).
Note
|
commitUpdates is used exclusively when HDFSBackedStateStore is requested to commit state changes.
|
Loading Specified Version of State (from Internal Cache or Snapshot and Delta Files) — loadMap
Internal Method
loadMap(
version: Long): ConcurrentHashMap[UnsafeRow, UnsafeRow]
loadMap
firstly tries to find the state version in the loadedMaps internal cache and, if found, returns it immediately and increments the loadedMapCacheHitCount metric.
If the requested state version could not be found in the loadedMaps internal cache, loadMap
prints out the following WARN message to the logs:
The state for version [version] doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
loadMap
increments the loadedMapCacheMissCount metric.
loadMap
tries to load the state snapshot file for the version and, if found, puts the version of state in the internal cache and returns it.
If not found, loadMap
tries to find the most recent state version by decrementing the requested version until one is found in the loadedMaps internal cache or loaded from a state snapshot (file).
loadMap
updateFromDeltaFile for all the remaining versions (from the snapshot version up to the requested one). loadMap
puts the final version of state in the internal cache (the closest snapshot and the remaining delta versions) and returns it.
In the end, loadMap
prints out the following DEBUG message to the logs:
Loading state for [version] takes [elapsedMs] ms.
Note
|
loadMap is used exclusively when HDFSBackedStateStoreProvider is requested for the specified version of a state store for update.
|
Loading State Snapshot File For Specified Version — readSnapshotFile
Internal Method
readSnapshotFile(
version: Long): Option[ConcurrentHashMap[UnsafeRow, UnsafeRow]]
readSnapshotFile
creates the path of the snapshot file for the given version
.
readSnapshotFile
requests the CheckpointFileManager to open the snapshot file for reading and creates a decompressed DataInputStream (input
).
readSnapshotFile
reads the decompressed input stream until an EOF (that is marked as the integer -1
in the stream) and inserts key and value rows in a state map (ConcurrentHashMap[UnsafeRow, UnsafeRow]
):
-
First integer is the size of a key (buffer) followed by the key itself (of the size).
readSnapshotFile
creates anUnsafeRow
for the key (with the number of fields as indicated by the number of fields of the key schema). -
Next integer is the size of a value (buffer) followed by the value itself (of the size).
readSnapshotFile
creates anUnsafeRow
for the value (with the number of fields as indicated by the number of fields of the value schema).
In the end, readSnapshotFile
prints out the following INFO message to the logs and returns the key-value map.
Read snapshot file for version [version] of [this] from [fileToRead]
In case of FileNotFoundException
readSnapshotFile
simply returns None
(to indicate no snapshot state file was available and so no state for the version).
readSnapshotFile
throws an IOException
for the size of a key or a value below 0
:
Error reading snapshot file [fileToRead] of [this]: [key|value] size cannot be [keySize|valueSize]
Note
|
readSnapshotFile is used exclusively when HDFSBackedStateStoreProvider is requested to load the specified version of state (from the internal cache or snapshot and delta files).
|
Updating State with State Changes For Specified Version (per Delta File) — updateFromDeltaFile
Internal Method
updateFromDeltaFile(
version: Long,
map: ConcurrentHashMap[UnsafeRow, UnsafeRow]): Unit
Note
|
The following description is almost an exact copy of readSnapshotFile just for completeness. |
updateFromDeltaFile
creates the path of the delta file for the requested version
.
updateFromDeltaFile
requests the CheckpointFileManager to open the delta file for reading and creates a decompressed DataInputStream (input
).
updateFromDeltaFile
reads the decompressed input stream until an EOF (that is marked as the integer -1
in the stream) and inserts key and value rows in the given state map:
-
First integer is the size of a key (buffer) followed by the key itself (of the size).
updateFromDeltaFile
creates anUnsafeRow
for the key (with the number of fields as indicated by the number of fields of the key schema). -
Next integer is the size of a value (buffer) followed by the value itself (of the size).
updateFromDeltaFile
creates anUnsafeRow
for the value (with the number of fields as indicated by the number of fields of the value schema) or removes the corresponding key from the state map (if the value size is-1
)
Note
|
updateFromDeltaFile removes the key-value entry from the state map if the value (size) is -1 .
|
In the end, updateFromDeltaFile
prints out the following INFO message to the logs and returns the key-value map.
Read delta file for version [version] of [this] from [fileToRead]
updateFromDeltaFile
throws an IllegalStateException
in case of FileNotFoundException
while opening the delta file for the specified version:
Error reading delta file [fileToRead] of [this]: [fileToRead] does not exist
Note
|
updateFromDeltaFile is used exclusively when HDFSBackedStateStoreProvider is requested to load the specified version of state (from the internal cache or snapshot and delta files).
|
Caching New Version of State — putStateIntoStateCacheMap
Internal Method
putStateIntoStateCacheMap(
newVersion: Long,
map: ConcurrentHashMap[UnsafeRow, UnsafeRow]): Unit
putStateIntoStateCacheMap
registers state for a given version, i.e. adds the map
state under the newVersion
key in the loadedMaps internal registry.
With the numberOfVersionsToRetainInMemory threshold as 0
or below, putStateIntoStateCacheMap
simply removes all entries from the loadedMaps internal registry and returns.
putStateIntoStateCacheMap
removes the oldest state version(s) in the loadedMaps internal registry until its size is at the numberOfVersionsToRetainInMemory threshold.
With the size of the loadedMaps internal registry is at the numberOfVersionsToRetainInMemory threshold, putStateIntoStateCacheMap
does two more optimizations per newVersion
-
It does not add the given state when the version of the oldest state is earlier (larger) than the given
newVersion
-
It removes the oldest state when older (smaller) than the given
newVersion
Note
|
putStateIntoStateCacheMap is used when HDFSBackedStateStoreProvider is requested to commit state (as a new version) and load the specified version of state (from the internal cache or snapshot and delta files).
|
Writing Compressed Snapshot File for Specified Version — writeSnapshotFile
Internal Method
writeSnapshotFile(
version: Long,
map: ConcurrentHashMap[UnsafeRow, UnsafeRow]): Unit
writeSnapshotFile
snapshotFile for the given version.
writeSnapshotFile
requests the CheckpointFileManager to create the snapshot file (with overwriting enabled) and compress the stream.
For every key-value UnsafeRow
pair in the given map, writeSnapshotFile
writes the size of the key followed by the key itself (as bytes). writeSnapshotFile
then writes the size of the value followed by the value itself (as bytes).
In the end, writeSnapshotFile
prints out the following INFO message to the logs:
Written snapshot file for version [version] of [this] at [targetFile]
In case of any Throwable
exception, writeSnapshotFile
cancelDeltaFile and re-throws the exception.
Note
|
writeSnapshotFile is used exclusively when HDFSBackedStateStoreProvider is requested to doSnapshot.
|
compressStream
Internal Method
compressStream(
outputStream: DataOutputStream): DataOutputStream
compressStream
creates a new LZ4CompressionCodec
(based on the SparkConf) and requests it to create a LZ4BlockOutputStream
with the given DataOutputStream
.
In the end, compressStream
creates a new DataOutputStream
with the LZ4BlockOutputStream
.
Note
|
compressStream is used when…FIXME
|
cancelDeltaFile
Internal Method
cancelDeltaFile(
compressedStream: DataOutputStream,
rawStream: CancellableFSDataOutputStream): Unit
cancelDeltaFile
…FIXME
Note
|
cancelDeltaFile is used when…FIXME
|
finalizeDeltaFile
Internal Method
finalizeDeltaFile(
output: DataOutputStream): Unit
finalizeDeltaFile
simply writes -1
to the given DataOutputStream
(to indicate end of file) and closes it.
Note
|
finalizeDeltaFile is used exclusively when HDFSBackedStateStoreProvider is requested to commit state changes (a new version of state).
|
Lookup Table (Cache) of States By Version — loadedMaps
Internal Method
loadedMaps: TreeMap[
Long, // version
ConcurrentHashMap[UnsafeRow, UnsafeRow]] // state (as keys and values)
loadedMaps
is a java.util.TreeMap of state versions sorted according to the reversed ordering of the versions (i.e. long numbers).
A new entry (a version and the state updates) can only be added when HDFSBackedStateStoreProvider
is requested to putStateIntoStateCacheMap (and only when the spark.sql.streaming.maxBatchesToRetainInMemory internal configuration is above 0
).
loadedMaps
is mainly used when HDFSBackedStateStoreProvider
is requested to load the specified version of state (from the internal cache or snapshot and delta files). Positive hits (when a version could be found in the cache) is available as the count of cache hit on states cache in provider performance metric while misses are counted in the count of cache miss on states cache in provider performance metric.
Note
|
With no or missing versions in cache count of cache miss on states cache in provider metric should be above 0 while count of cache hit on states cache in provider always 0 (or smaller than the other metric).
|
The estimated size of loadedMaps
is available as the memoryUsedBytes performance metric.
The spark.sql.streaming.maxBatchesToRetainInMemory internal configuration is used as the threshold of the number of elements in loadedMaps
. When 0
or negative, every putStateIntoStateCacheMap removes all elements in (clears) loadedMaps
.
Note
|
It is possible to change the configuration at restart of a structured query. |
The state deltas (the values) in loadedMaps
are cleared (all entries removed) when HDFSBackedStateStoreProvider
is requested to close.
Used when HDFSBackedStateStoreProvider
is requested for the following:
Internal Properties
Name | Description |
---|---|
|
CheckpointFileManager for the state checkpoint base directory (and the Hadoop Configuration) Used when:
|
|
Hadoop Configuration of the CheckpointFileManager Given when |
|
Schema of the state keys |
|
Schema of the state values |
|
|
|