EventTimeWatermark Unary Logical Operator — Streaming Watermark

EventTimeWatermark is a unary logical operator that is created to represent Dataset.withWatermark operator in a logical query plan of a streaming query.

Note

A unary logical operator (UnaryNode) is a logical operator with a single child logical operator.

Read up on UnaryNode (and logical operators in general) in The Internals of Spark SQL book.

EventTimeWatermark is resolved (aka planned) to EventTimeWatermarkExec physical operator in StatefulAggregationStrategy execution planning strategy.

Note

EliminateEventTimeWatermark logical optimization rule (i.e. Rule[LogicalPlan]) removes EventTimeWatermark logical operator from a logical plan if the child logical operator is not streaming, i.e. when Dataset.withWatermark operator is used on a batch query.

val logs = spark.
  read. // <-- batch non-streaming query that makes `EliminateEventTimeWatermark` rule applicable
  format("text").
  load("logs")

// logs is a batch Dataset
assert(!logs.isStreaming)

val q = logs.
  withWatermark(eventTime = "timestamp", delayThreshold = "30 seconds") // <-- creates EventTimeWatermark
scala> println(q.queryExecution.logical.numberedTreeString) // <-- no EventTimeWatermark as it was removed immediately
00 Relation[value#0] text

Creating EventTimeWatermark Instance

EventTimeWatermark takes the following to be created:

  • Name of the column for event-time watermark

  • Delay (CalendarInterval)

  • Child logical operator (LogicalPlan)

Output Schema — output Property

output: Seq[Attribute]
Note
output is part of the QueryPlan Contract to describe the attributes of (the schema of) the output.

output finds eventTime column in the output schema of the child logical operator and updates the Metadata of the column with spark.watermarkDelayMs key and the milliseconds for the delay.

output removes spark.watermarkDelayMs key from the other columns.

// FIXME How to access/show the eventTime column with the metadata updated to include spark.watermarkDelayMs?
import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
val etw = q.queryExecution.logical.asInstanceOf[EventTimeWatermark]
scala> etw.output.toStructType.printTreeString
root
 |-- timestamp: timestamp (nullable = true)
 |-- value: long (nullable = true)

spark.watermarkDelayMs Metadata Key

EventTimeWatermark uses spark.watermarkDelayMs key (in the Metadata of the output attributes) to hold the event-time watermark delay (as a so-called watermark attribute or eventTime watermark).

The event-time watermark delay is used to calculate the difference between the event time of an event (modeled as a row in the streaming Dataset) and the time in the past.

results matching ""

    No results matching ""