CheckpointFileManager Contract

CheckpointFileManager is the abstraction of checkpoint managers that manage checkpoint files (metadata of streaming batches) on Hadoop DFS-compatible file systems.

CheckpointFileManager is created per spark.sql.streaming.checkpointFileManagerClass configuration property if defined before reverting to the available checkpoint managers.

CheckpointFileManager is used exclusively by HDFSMetadataLog, StreamMetadata and HDFSBackedStateStoreProvider.

Table 1. CheckpointFileManager Contract
Method Description

createAtomic

createAtomic(
  path: Path,
  overwriteIfPossible: Boolean): CancellableFSDataOutputStream

Used when:

delete

delete(path: Path): Unit

Deletes the given path recursively (if exists)

Used when:

exists

exists(path: Path): Boolean

Used when HDFSMetadataLog is created (to create the metadata directory) and requested for metadata of a batch

isLocal

isLocal: Boolean

Does not seem to be used.

list

list(
  path: Path): Array[FileStatus] (1)
list(
  path: Path,
  filter: PathFilter): Array[FileStatus]
  1. Uses PathFilter that accepts all files in the path

Lists all files in the given path

Used when:

mkdirs

mkdirs(path: Path): Unit

Used when:

open

open(path: Path): FSDataInputStream

Opens a file (by the given path) for reading

Used when:

Table 2. CheckpointFileManagers
CheckpointFileManager Description

FileContextBasedCheckpointFileManager

Default CheckpointFileManager that uses Hadoop’s FileContext API for managing checkpoint files (unless spark.sql.streaming.checkpointFileManagerClass configuration property is used)

FileSystemBasedCheckpointFileManager

Basic CheckpointFileManager that uses Hadoop’s FileSystem API for managing checkpoint files (that assumes that the implementation of FileSystem.rename() is atomic or the correctness and fault-tolerance of Structured Streaming is not guaranteed)

Creating CheckpointFileManager Instance — create Object Method

create(
  path: Path,
  hadoopConf: Configuration): CheckpointFileManager

create finds spark.sql.streaming.checkpointFileManagerClass configuration property in the hadoopConf configuration.

If found, create simply instantiates whatever CheckpointFileManager implementation is defined.

If not found, create creates a FileContextBasedCheckpointFileManager.

In case of UnsupportedFileSystemException, create prints out the following WARN message to the logs and creates (falls back on) a FileSystemBasedCheckpointFileManager.

Could not use FileContext API for managing Structured Streaming checkpoint files at [path]. Using FileSystem API instead for managing log files. If the implementation of FileSystem.rename() is not atomic, then the correctness and fault-tolerance of your Structured Streaming is not guaranteed.
Note

create is used when:

results matching ""

    No results matching ""