EventTimeWatermarkExec Unary Physical Operator

EventTimeWatermarkExec is a unary physical operator that represents EventTimeWatermark logical operator at execution time.


A unary physical operator (UnaryExecNode) is a physical operator with a single child physical operator.

Read up on UnaryExecNode (and physical operators in general) in The Internals of Spark SQL book.

The purpose of the EventTimeWatermarkExec operator is to simply extract (project) the values of the event-time watermark column and add them directly to the EventTimeStatsAccum internal accumulator.


Since the execution (data processing) happens on Spark executors, the only way to establish communication between the tasks (on the executors) and the driver is to use an accumulator.

EventTimeWatermarkExec uses EventTimeStatsAccum internal accumulator as a way to send the statistics (the maximum, minimum, average and update count) of the values in the event-time watermark column that is later used in:

EventTimeWatermarkExec is created exclusively when StatefulAggregationStrategy execution planning strategy is requested to plan a logical plan with EventTimeWatermark logical operators for execution.

Creating EventTimeWatermarkExec Instance

EventTimeWatermarkExec takes the following to be created:

  • Event time column - the column with the (event) time for event-time watermark

  • Delay interval (CalendarInterval)

  • Child physical operator (SparkPlan)

While being created, EventTimeWatermarkExec registers the EventTimeStatsAccum internal accumulator (with the current SparkContext).

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

doExecute(): RDD[InternalRow]
doExecute is part of SparkPlan Contract to generate the runtime representation of an physical operator as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

Internally, doExecute executes the child physical operator and maps over the partitions (using RDD.mapPartitions).

doExecute creates an unsafe projection (one per partition) for the column with the event time in the output schema of the child physical operator. The unsafe projection is to extract event times from the (stream of) internal rows of the child physical operator.

For every row (InternalRow) per partition, doExecute requests the eventTimeStats accumulator to add the event time.

The event time value is in seconds (not millis as the value is divided by 1000 ).

Output Attributes (Schema) — output Property

output: Seq[Attribute]
output is part of the QueryPlan Contract to describe the attributes of the output (aka schema).

output requests the child physical operator for the output attributes to find the event time column and any other column with metadata that contains spark.watermarkDelayMs key.

For the event time column, output updates the metadata to include the delay interval for the spark.watermarkDelayMs key.

For any other column (not the event time column) with the spark.watermarkDelayMs key, output simply removes the key from the metadata.

// FIXME: Would be nice to have a demo. Anyone?

Internal Properties

Name Description


Delay interval - the delay interval in milliseconds

Used when:


EventTimeStatsAccum accumulator to accumulate eventTime values from every row in a streaming batch (when EventTimeWatermarkExec is executed).

EventTimeStatsAccum is a Spark accumulator of EventTimeStats from Longs (i.e. AccumulatorV2[Long, EventTimeStats]).
Every Spark accumulator has to be registered before use, and eventTimeStats is registered when EventTimeWatermarkExec is created.

results matching ""

    No results matching ""