Checkpointing

Checkpointing is a process of writing received records (by means of input dstreams) at checkpoint intervals to a highly-available HDFS-compatible storage. It allows creating fault-tolerant stream processing pipelines so when a failure occurs input dstreams can restore the before-failure streaming state and continue stream processing (as if nothing had happened).

DStreams can checkpoint input data at specified time intervals.

Marking StreamingContext as Checkpointed

You use StreamingContext.checkpoint method to set up a HDFS-compatible checkpoint directory where checkpoint data will be persisted, as follows:

ssc.checkpoint("_checkpoint")

Checkpoint Interval and Checkpointing DStreams

You can set up periodic checkpointing of a dstream every checkpoint interval using DStream.checkpoint method.

val ssc: StreamingContext = ...
// set the checkpoint directory
ssc.checkpoint("_checkpoint")
val ds: DStream[Int] = ...
val cds: DStream[Int] = ds.checkpoint(Seconds(5))
// do something with the input dstream
cds.print

Recreating StreamingContext from Checkpoint

You can create a StreamingContext from a checkpoint directory, i.e. recreate a fully-working StreamingContext as recorded in the last valid checkpoint file that was written to the checkpoint directory.

Note
You can also create a brand new StreamingContext (and putting checkpoints aside).
Warning
You must not create input dstreams using a StreamingContext that has been recreated from checkpoint. Otherwise, you will not start the StreamingContext at all.

When you use StreamingContext(path: String) constructor (or the variants thereof), it uses Hadoop configuration to access path directory on a Hadoop-supported file system.

Effectively, the two variants use StreamingContext(path: String, hadoopConf: Configuration) constructor that reads the latest valid checkpoint file (and hence enables )

Note
SparkContext and batch interval are set to their corresponding values using the checkpoint file.

Example: Recreating StreamingContext from Checkpoint

The following Scala code demonstrates how to use the checkpoint directory _checkpoint to (re)create the StreamingContext or create one from scratch.

val appName = "Recreating StreamingContext from Checkpoint"
val sc = new SparkContext("local[*]", appName, new SparkConf())

val checkpointDir = "_checkpoint"

def createSC(): StreamingContext = {
  val ssc = new StreamingContext(sc, batchDuration = Seconds(5))

  // NOTE: You have to create dstreams inside the method
  // See http://stackoverflow.com/q/35090180/1305344

  // Create constant input dstream with the RDD
  val rdd = sc.parallelize(0 to 9)
  import org.apache.spark.streaming.dstream.ConstantInputDStream
  val cis = new ConstantInputDStream(ssc, rdd)

  // Sample stream computation
  cis.print

  ssc.checkpoint(checkpointDir)
  ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDir, createSC)

// Start streaming processing
ssc.start

DStreamCheckpointData

DStreamCheckpointData works with a single dstream. An instance of DStreamCheckpointData is created when a dstream is.

It tracks checkpoint data in the internal data registry that records batch time and the checkpoint data at that time. The internal checkpoint data can be anything that a dstream wants to checkpoint. DStreamCheckpointData returns the registry when currentCheckpointFiles method is called.

Note
By default, DStreamCheckpointData records the checkpoint files to which the generated RDDs of the DStream has been saved.
Tip

Enable DEBUG logging level for org.apache.spark.streaming.dstream.DStreamCheckpointData logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.streaming.dstream.DStreamCheckpointData=DEBUG

Refer to Logging.

Updating Collection of Batches and Checkpoint Directories (update method)

update(time: Time): Unit

update collects batches and the directory names where the corresponding RDDs were checkpointed (filtering the dstream’s internal generatedRDDs mapping).

You should see the following DEBUG message in the logs:

DEBUG Current checkpoint files:
[checkpointFile per line]

The collection of the batches and their checkpointed RDDs is recorded in an internal field for serialization (i.e. it becomes the current value of the internal field currentCheckpointFiles that is serialized when requested).

The collection is also added to an internal transient (non-serializable) mapping timeToCheckpointFile and the oldest checkpoint (given batch times) is recorded in an internal transient mapping for the current time.

Deleting Old Checkpoint Files (cleanup method)

cleanup(time: Time): Unit

cleanup deletes checkpoint files older than the oldest batch for the input time.

It first gets the oldest batch time for the input time (see Updating Collection of Batches and Checkpoint Directories (update method)).

If the (batch) time has been found, all the checkpoint files older are deleted (as tracked in the internal timeToCheckpointFile mapping).

You should see the following DEBUG message in the logs:

DEBUG Files to delete:
[comma-separated files to delete]

For each checkpoint file successfully deleted, you should see the following INFO message in the logs:

INFO Deleted checkpoint file '[file]' for time [time]

Errors in checkpoint deletion are reported as WARN messages in the logs:

WARN Error deleting old checkpoint file '[file]' for time [time]

Otherwise, when no (batch) time has been found for the given input time, you should see the following DEBUG message in the logs:

DEBUG Nothing to delete

Restoring Generated RDDs from Checkpoint Files (restore method)

restore(): Unit

restore restores the dstream’s generatedRDDs given persistent internal data mapping with batch times and corresponding checkpoint files.

restore takes the current checkpoint files and restores checkpointed RDDs from each checkpoint file (using SparkContext.checkpointFile).

You should see the following INFO message in the logs per checkpoint file:

INFO Restoring checkpointed RDD for time [time] from file '[file]'
Note
It is called by DStream.restoreCheckpointData().

Checkpoint

Checkpoint class requires a StreamingContext and checkpointTime time to be instantiated. The internal property checkpointTime corresponds to the batch time it represents.

Note
Checkpoint class is written to a persistent storage (aka serialized) using CheckpointWriter.write method and read back (aka deserialize) using Checkpoint.deserialize.
Note
Initial checkpoint is the checkpoint a StreamingContext was started with.

It is merely a collection of the settings of the current streaming runtime environment that is supposed to recreate the environment after it goes down due to a failure or when the streaming context is stopped immediately.

It collects the settings from the input StreamingContext (and indirectly from the corresponding JobScheduler and SparkContext):

Tip

Enable INFO logging level for org.apache.spark.streaming.Checkpoint logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.streaming.Checkpoint=INFO

Refer to Logging.

Serializing Checkpoint (serialize method)

serialize(checkpoint: Checkpoint, conf: SparkConf): Array[Byte]

serialize serializes the checkpoint object. It does so by creating a compression codec to write the input checkpoint object with and returns the result as a collection of bytes.

Caution
FIXME Describe compression codecs in Spark.

Deserializing Checkpoint (deserialize method)

deserialize(inputStream: InputStream, conf: SparkConf): Checkpoint

deserialize reconstructs a Checkpoint object from the input inputStream. It uses a compression codec and once read the just-built Checkpoint object is validated and returned back.

Note
deserialize is called when reading the latest valid checkpoint file.

Validating Checkpoint (validate method)

validate(): Unit

validate validates the Checkpoint. It ensures that master, framework, graph, and checkpointTime are defined, i.e. not null.

Note
validate is called when a checkpoint is deserialized from an input stream.

You should see the following INFO message in the logs when the object passes the validation:

INFO Checkpoint: Checkpoint for time [checkpointTime] ms validated

Get Collection of Checkpoint Files from Directory (getCheckpointFiles method)

getCheckpointFiles(checkpointDir: String, fsOption: Option[FileSystem] = None): Seq[Path]

getCheckpointFiles method returns a collection of checkpoint files from the given checkpoint directory checkpointDir.

The method sorts the checkpoint files by time with a temporary .bk checkpoint file first (given a pair of a checkpoint file and its backup file).

CheckpointWriter

An instance of CheckpointWriter is created (lazily) when JobGenerator is, but only when JobGenerator is configured for checkpointing.

Writing Checkpoint for Batch Time (write method)

write(checkpoint: Checkpoint, clearCheckpointDataLater: Boolean): Unit

write method serializes the checkpoint object and passes the serialized form to CheckpointWriteHandler to write asynchronously (i.e. on a separate thread) using single-thread thread pool executor.

You should see the following INFO message in the logs:

INFO CheckpointWriter: Submitted checkpoint of time [checkpoint.checkpointTime] ms writer queue

If the asynchronous checkpoint write fails, you should see the following ERROR in the logs:

ERROR Could not submit checkpoint task to the thread pool executor

Stopping CheckpointWriter (using stop method)

stop(): Unit

CheckpointWriter uses the internal stopped flag to mark whether it is stopped or not.

Note
stopped flag is disabled, i.e. false, when CheckpointWriter is created.

stop method checks the internal stopped flag and returns if it says it is stopped already.

If not, it orderly shuts down the internal single-thread thread pool executor and awaits termination for 10 seconds. During that time, any asynchronous checkpoint writes can be safely finished, but no new tasks will be accepted.

Note
The wait time before executor stops is fixed, i.e. not configurable, and is set to 10 seconds.

After 10 seconds, when the thread pool did not terminate, stop stops it forcefully.

You should see the following INFO message in the logs:

INFO CheckpointWriter: CheckpointWriter executor terminated? [terminated], waited for [time] ms.

CheckpointWriter is marked as stopped, i.e. stopped flag is set to true.

Single-Thread Thread Pool Executor

executor is an internal single-thread thread pool executor for executing asynchronous checkpoint writes using CheckpointWriteHandler.

It shuts down when CheckpointWriter is stopped (with a 10-second graceful period before it terminated forcefully).

CheckpointWriteHandler — Asynchronous Checkpoint Writes

CheckpointWriteHandler is an (internal) thread of execution that does checkpoint writes. It is instantiated with checkpointTime, the serialized form of the checkpoint, and whether or not to clean checkpoint data later flag (as clearCheckpointDataLater).

Note
It is only used by CheckpointWriter to queue a checkpoint write for a batch time.

It records the current checkpoint time (in latestCheckpointTime) and calculates the name of the checkpoint file.

Note
The name of the checkpoint file is checkpoint-[checkpointTime.milliseconds].

It uses a backup file to do atomic write, i.e. it writes to the checkpoint backup file first and renames the result file to the final checkpoint file name.

Note
The name of the checkpoint backup file is checkpoint-[checkpointTime.milliseconds].bk.
Note
CheckpointWriteHandler does 3 write attempts at the maximum. The value is not configurable.

When attempting to write, you should see the following INFO message in the logs:

INFO CheckpointWriter: Saving checkpoint for time [checkpointTime] ms to file '[checkpointFile]'
Note
It deletes any checkpoint backup files that may exist from the previous attempts.

It then deletes checkpoint files when there are more than 10.

Note
The number of checkpoint files when the deletion happens, i.e. 10, is fixed and not configurable.

You should see the following INFO message in the logs:

INFO CheckpointWriter: Deleting [file]

If all went fine, you should see the following INFO message in the logs:

INFO CheckpointWriter: Checkpoint for time [checkpointTime] ms saved to file '[checkpointFile]', took [bytes] bytes and [time] ms

JobGenerator is informed that the checkpoint write completed (with checkpointTime and clearCheckpointDataLater flag).

In case of write failures, you can see the following WARN message in the logs:

WARN CheckpointWriter: Error in attempt [attempts] of writing checkpoint to [checkpointFile]

If the number of write attempts exceeded (the fixed) 10 or CheckpointWriter was stopped before any successful checkpoint write, you should see the following WARN message in the logs:

WARN CheckpointWriter: Could not write checkpoint for time [checkpointTime] to file [checkpointFile]'

CheckpointReader

Reading Latest Valid Checkpoint File

read(checkpointDir: String): Option[Checkpoint]
read(checkpointDir: String, conf: SparkConf,
     hadoopConf: Configuration, ignoreReadError: Boolean = false): Option[Checkpoint]

read methods read the latest valid checkpoint file from the checkpoint directory checkpointDir. They differ in whether Spark configuration conf and Hadoop configuration hadoopConf are given or created in place.

Note
The 4-parameter read method is used by StreamingContext to recreate itself from a checkpoint file.

The first read throws no SparkException when no checkpoint file could be read.

Note
It appears that no part of Spark Streaming uses the simplified version of read.

read uses Apache Hadoop’s Path and Configuration to get the checkpoint files (using Checkpoint.getCheckpointFiles) in reverse order.

If there is no checkpoint file in the checkpoint directory, it returns None.

You should see the following INFO message in the logs:

INFO CheckpointReader: Checkpoint files found: [checkpointFiles]

The method reads all the checkpoints (from the youngest to the oldest) until one is successfully loaded, i.e. deserialized.

You should see the following INFO message in the logs just before deserializing a checkpoint file:

INFO CheckpointReader: Attempting to load checkpoint from file [file]

If the checkpoint file was loaded, you should see the following INFO messages in the logs:

INFO CheckpointReader: Checkpoint successfully loaded from file [file]
INFO CheckpointReader: Checkpoint was generated at time [checkpointTime]

In case of any issues while loading a checkpoint file, you should see the following WARN in the logs and the corresponding exception:

WARN CheckpointReader: Error reading checkpoint from file [file]

Unless ignoreReadError flag is disabled, when no checkpoint file could be read, SparkException is thrown with the following message:

Failed to read checkpoint from directory [checkpointPath]

None is returned at this point and the method finishes.

results matching ""

    No results matching ""