val logs = spark.
read. // <-- batch non-streaming query that makes `EliminateEventTimeWatermark` rule applicable
format("text").
load("logs")
// logs is a batch Dataset
assert(!logs.isStreaming)
val q = logs.
withWatermark(eventTime = "timestamp", delayThreshold = "30 seconds") // <-- creates EventTimeWatermark
scala> println(q.queryExecution.logical.numberedTreeString) // <-- no EventTimeWatermark as it was removed immediately
00 Relation[value#0] text
EventTimeWatermark Unary Logical Operator — Streaming Watermark
EventTimeWatermark is a unary logical operator that is created to represent Dataset.withWatermark operator in a logical query plan of a streaming query.
|
Note
|
A unary logical operator ( Read up on UnaryNode (and logical operators in general) in The Internals of Spark SQL book. |
When requested for the output attributes, EventTimeWatermark logical operator goes over the output attributes of the child logical operator to find the matching attribute based on the eventTime attribute and updates it to include spark.watermarkDelayMs metadata key with the watermark delay interval (converted to milliseconds).
EventTimeWatermark is resolved (planned) to EventTimeWatermarkExec physical operator in StatefulAggregationStrategy execution planning strategy.
|
Note
|
|
Output Schema — output Property
output: Seq[Attribute]
|
Note
|
output is part of the QueryPlan Contract to describe the attributes of (the schema of) the output.
|
output finds eventTime column in the output schema of the child logical operator and updates the Metadata of the column with spark.watermarkDelayMs key and the milliseconds for the delay.
output removes spark.watermarkDelayMs key from the other columns.
// FIXME How to access/show the eventTime column with the metadata updated to include spark.watermarkDelayMs?
import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
val etw = q.queryExecution.logical.asInstanceOf[EventTimeWatermark]
scala> etw.output.toStructType.printTreeString
root
|-- timestamp: timestamp (nullable = true)
|-- value: long (nullable = true)
Watermark Metadata (Marker) — spark.watermarkDelayMs Metadata Key
spark.watermarkDelayMs metadata key is used to mark one of the output attributes as the watermark attribute (eventTime watermark).