val logs = spark.
read. // <-- batch non-streaming query that makes `EliminateEventTimeWatermark` rule applicable
format("text").
load("logs")
// logs is a batch Dataset
assert(!logs.isStreaming)
val q = logs.
withWatermark(eventTime = "timestamp", delayThreshold = "30 seconds") // <-- creates EventTimeWatermark
scala> println(q.queryExecution.logical.numberedTreeString) // <-- no EventTimeWatermark as it was removed immediately
00 Relation[value#0] text
EventTimeWatermark Unary Logical Operator — Streaming Watermark
EventTimeWatermark
is a unary logical operator that is created to represent Dataset.withWatermark operator in a logical query plan of a streaming query.
Note
|
A unary logical operator ( Read up on UnaryNode (and logical operators in general) in The Internals of Spark SQL book. |
When requested for the output attributes, EventTimeWatermark
logical operator goes over the output attributes of the child logical operator to find the matching attribute based on the eventTime attribute and updates it to include spark.watermarkDelayMs
metadata key with the watermark delay interval (converted to milliseconds).
EventTimeWatermark
is resolved (planned) to EventTimeWatermarkExec physical operator in StatefulAggregationStrategy execution planning strategy.
Note
|
|
Output Schema — output
Property
output: Seq[Attribute]
Note
|
output is part of the QueryPlan Contract to describe the attributes of (the schema of) the output.
|
output
finds eventTime column in the output schema of the child logical operator and updates the Metadata
of the column with spark.watermarkDelayMs key and the milliseconds for the delay.
output
removes spark.watermarkDelayMs key from the other columns.
// FIXME How to access/show the eventTime column with the metadata updated to include spark.watermarkDelayMs?
import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
val etw = q.queryExecution.logical.asInstanceOf[EventTimeWatermark]
scala> etw.output.toStructType.printTreeString
root
|-- timestamp: timestamp (nullable = true)
|-- value: long (nullable = true)
Watermark Metadata (Marker) — spark.watermarkDelayMs
Metadata Key
spark.watermarkDelayMs
metadata key is used to mark one of the output attributes as the watermark attribute (eventTime watermark).