OffsetSeqLog — Hadoop DFS-based Metadata Storage of OffsetSeqs

OffsetSeqLog is a Hadoop DFS-based metadata storage for OffsetSeq metadata.

OffsetSeqLog uses OffsetSeq for metadata which holds an ordered collection of offsets and optional metadata (as OffsetSeqMetadata for event-time watermark).

OffsetSeqLog uses 1 for the version when serializing and deserializing metadata.

Creating OffsetSeqLog Instance

OffsetSeqLog (like the parent HDFSMetadataLog) takes the following to be created:

  • SparkSession

  • Path of the metadata log directory

Serializing Metadata (Writing Metadata in Serialized Format) — serialize Method

serialize(
  offsetSeq: OffsetSeq,
  out: OutputStream): Unit
Note
serialize is part of HDFSMetadataLog Contract to serialize metadata (write metadata in serialized format).

serialize firstly writes out the version prefixed with v on a single line (e.g. v1) followed by the optional metadata in JSON format.

serialize then writes out the offsets in JSON format, one per line.

Note
No offsets to write in offsetSeq for a streaming source is marked as - (a dash) in the log.
$ ls -tr [checkpoint-directory]/offsets
0 1 2 3 4 5 6

$ cat [checkpoint-directory]/offsets/6
v1
{"batchWatermarkMs":0,"batchTimestampMs":1502872590006,"conf":{"spark.sql.shuffle.partitions":"200","spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider"}}
51

Deserializing Metadata (Reading OffsetSeq from Serialized Format) — deserialize Method

deserialize(in: InputStream): OffsetSeq
Note
deserialize is part of HDFSMetadataLog Contract to deserialize metadata (read metadata from serialized format).

deserialize firstly parses the version on the first line.

deserialize reads the optional metadata (with an empty line for metadata not available).

deserialize creates a SerializedOffset for every line left.

In the end, deserialize creates a OffsetSeq for the optional metadata and the SerializedOffsets.

When there are no lines in the InputStream, deserialize throws an IllegalStateException:

Incomplete log file

results matching ""

    No results matching ""