Streaming Watermark

Streaming Watermark of a stateful streaming query is how long to wait for late and possibly out-of-order events until a streaming state can be considered final and not to change. Streaming watermark is used to mark events (modeled as a row in the streaming Dataset) that are older than the threshold as "too late", and not "interesting" to update partial non-final streaming state.

In Spark Structured Streaming, streaming watermark is defined using Dataset.withWatermark high-level operator.

withWatermark(
  eventTime: String,
  delayThreshold: String): Dataset[T]

In Dataset.withWatermark operator, eventTime is the name of the column to use to monitor event time whereas delayThreshold is a delay threshold.

Watermark Delay says how late and possibly out-of-order events are still acceptable and contribute to the final result of a stateful streaming query. Event-time watermark delay is used to calculate the difference between the event time of an event and the time in the past.

Event-Time Watermark is then a time threshold (point in time) that is the minimum acceptable time of an event (modeled as a row in the streaming Dataset) that is accepted in a stateful streaming query.

With streaming watermark, memory usage of a streaming state can be controlled as late events can easily be dropped, and old state (e.g. aggregates or join) that are never going to be updated removed. That avoids unbounded streaming state that would inevitably use up all the available memory of long-running streaming queries and end up in out of memory errors.

In Append output mode the current event-time streaming watermark is used for the following:

  • Output saved state rows that became expired (Expired events in the demo)

  • Dropping late events, i.e. don’t save them to a state store or include in aggregation (Late events in the demo)

Streaming watermark is required for a streaming aggregation in append output mode.

Streaming Aggregation

In streaming aggregation, a streaming watermark has to be defined on one or many grouping expressions of a streaming aggregation (directly or using window standard function).

Note
Dataset.withWatermark operator has to be used before an aggregation operator (for the watermark to have an effect).

Streaming Join

In streaming join, a streaming watermark can be defined on join keys or any of the join sides.

Demos

Use the following demos to learn more:

Internals

Under the covers, Dataset.withWatermark high-level operator creates a logical query plan with EventTimeWatermark logical operator.

EventTimeWatermark logical operator is planned to EventTimeWatermarkExec physical operator that extracts the event times (from the data being processed) and adds them to an accumulator.

Since the execution (data processing) happens on Spark executors, using the accumulator is the only Spark-approved way for communication between the tasks (on the executors) and the driver. Using accumulator updates the driver with the current event-time watermark.

During the query planning phase (in MicroBatchExecution and ContinuousExecution) that also happens on the driver, IncrementalExecution is given the current OffsetSeqMetadata with the current event-time watermark.

results matching ""

    No results matching ""