EventTimeWatermark Unary Logical Operator — Streaming Watermark

EventTimeWatermark is a unary logical operator that is created to represent Dataset.withWatermark operator in a logical query plan of a streaming query.

Note

A unary logical operator (UnaryNode) is a logical operator with a single child logical operator.

Read up on UnaryNode (and logical operators in general) in The Internals of Spark SQL book.

When requested for the output attributes, EventTimeWatermark logical operator goes over the output attributes of the child logical operator to find the matching attribute based on the eventTime attribute and updates it to include spark.watermarkDelayMs metadata key with the watermark delay interval (converted to milliseconds).

EventTimeWatermark is resolved (planned) to EventTimeWatermarkExec physical operator in StatefulAggregationStrategy execution planning strategy.

Note

EliminateEventTimeWatermark logical optimization rule (i.e. Rule[LogicalPlan]) removes EventTimeWatermark logical operator from a logical plan if the child logical operator is not streaming, i.e. when Dataset.withWatermark operator is used on a batch query.

val logs = spark.
  read. // <-- batch non-streaming query that makes `EliminateEventTimeWatermark` rule applicable
  format("text").
  load("logs")

// logs is a batch Dataset
assert(!logs.isStreaming)

val q = logs.
  withWatermark(eventTime = "timestamp", delayThreshold = "30 seconds") // <-- creates EventTimeWatermark
scala> println(q.queryExecution.logical.numberedTreeString) // <-- no EventTimeWatermark as it was removed immediately
00 Relation[value#0] text

Creating EventTimeWatermark Instance

EventTimeWatermark takes the following to be created:

  • Watermark column (Attribute)

  • Watermark delay (CalendarInterval)

  • Child logical operator (LogicalPlan)

Output Schema — output Property

output: Seq[Attribute]
Note
output is part of the QueryPlan Contract to describe the attributes of (the schema of) the output.

output finds eventTime column in the output schema of the child logical operator and updates the Metadata of the column with spark.watermarkDelayMs key and the milliseconds for the delay.

output removes spark.watermarkDelayMs key from the other columns.

// FIXME How to access/show the eventTime column with the metadata updated to include spark.watermarkDelayMs?
import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
val etw = q.queryExecution.logical.asInstanceOf[EventTimeWatermark]
scala> etw.output.toStructType.printTreeString
root
 |-- timestamp: timestamp (nullable = true)
 |-- value: long (nullable = true)

Watermark Metadata (Marker) — spark.watermarkDelayMs Metadata Key

spark.watermarkDelayMs metadata key is used to mark one of the output attributes as the watermark attribute (eventTime watermark).

Converting Human-Friendly CalendarInterval to Milliseconds — getDelayMs Object Method

getDelayMs(
  delay: CalendarInterval): Long

getDelayMs…​FIXME

Note
getDelayMs is used when…​FIXME

results matching ""

    No results matching ""