WatermarkSupport Contract — Unary Physical Operators with Streaming Watermark Support

WatermarkSupport is the contract for unary physical operators (i.e. UnaryExecNode) with streaming watermark support.


Watermark (aka "allowed lateness") is a moving threshold of event time and specifies what data to consider for aggregations, i.e. the threshold of late data so the engine can automatically drop incoming late data given event time and clean up old state accordingly.

Read the official documentation of Spark in Handling Late Data and Watermarking.

Table 1. WatermarkSupport’s (Lazily-Initialized) Properties
Property Description


Optional Catalyst expression that matches rows older than the event time watermark.

Use withWatermark operator to specify streaming watermark.

When initialized, watermarkExpression finds spark.watermarkDelayMs watermark attribute in the child output’s metadata.

If found, watermarkExpression creates evictionExpression with the watermark attribute that is less than or equal eventTimeWatermark.

The watermark attribute may be of type StructType. If it is, watermarkExpression uses the first field as the watermark.

watermarkExpression prints out the following INFO message to the logs when spark.watermarkDelayMs watermark attribute is found.

INFO [physicalOperator]Exec: Filtering state store on: [evictionExpression]
Enable INFO logging level for one of the stateful physical operators to see the INFO message in the logs.


Optional Predicate that uses watermarkExpression and the child output to match rows older than the watermark.


Optional Predicate that uses keyExpressions to match rows older than the event time watermark.

WatermarkSupport Contract

package org.apache.spark.sql.execution.streaming

trait WatermarkSupport extends UnaryExecNode {
  // only required methods that have no implementation
  def eventTimeWatermark: Option[Long]
  def keyExpressions: Seq[Attribute]
Table 2. WatermarkSupport Contract
Method Description


Used mainly in watermarkExpression to create a LessThanOrEqual Catalyst binary expression that matches rows older than the watermark.


Grouping keys (in FlatMapGroupsWithStateExec), duplicate keys (in StreamingDeduplicateExec) or key attributes (in StateStoreSaveExec) with at most one that may have spark.watermarkDelayMs watermark attribute in metadata

Used in watermarkPredicateForKeys to create a Predicate to match rows older than the event time watermark.

Used also when StateStoreSaveExec and StreamingDeduplicateExec physical operators are executed.

Removing Keys From StateStore Older Than Watermark — removeKeysOlderThanWatermark Method

removeKeysOlderThanWatermark(store: StateStore): Unit

removeKeysOlderThanWatermark requests the input store for all rows.

removeKeysOlderThanWatermark then uses watermarkPredicateForKeys to remove matching rows from the store.

removeKeysOlderThanWatermark is used exclusively when StreamingDeduplicateExec physical operator is executed.

removeKeysOlderThanWatermark Method

  storeManager: StreamingAggregationStateManager,
  store: StateStore): Unit


removeKeysOlderThanWatermark is used exclusively when StateStoreSaveExec physical operator is executed.

results matching ""

    No results matching ""