EventTimeWatermarkExec Unary Physical Operator for Accumulating Event Time Watermark

EventTimeWatermarkExec is a unary physical operator (aka UnaryExecNode) that accumulates the event time values (that appear in eventTime watermark column).

Note

EventTimeWatermarkExec uses eventTimeStats accumulator to send the statistics (i.e. the maximum, minimum, average and count) for the event time column in a streaming batch that is later used in:

EventTimeWatermarkExec is created when StatefulAggregationStrategy execution planning strategy plans a EventTimeWatermark logical operator for execution.

Note
EventTimeWatermark logical operator is created as the result of withWatermark operator.
val rates = spark.
  readStream.
  format("rate").
  load.
  withWatermark(eventTime = "timestamp", delayThreshold = "10 seconds") // <-- use EventTimeWatermark logical operator
scala> rates.explain
== Physical Plan ==
EventTimeWatermark timestamp#0: timestamp, interval 10 seconds
+- StreamingRelation rate, [timestamp#0, value#1L]

import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val sq = rates.
  writeStream.
  format("console").
  option("truncate", false).
  trigger(Trigger.ProcessingTime(10.seconds)).
  outputMode(OutputMode.Append).
  queryName("rates-to-console").
  start
17/08/11 09:04:17 INFO StreamExecution: Starting rates-to-console [id = ec8f8228-90f6-4e1f-8ad2-80222affed63, runId = f605c134-cfb0-4378-88c1-159d8a7c232e] with file:///private/var/folders/0w/kb0d3rqn4zb9fcc91pxhgn8w0000gn/T/temporary-3869a982-9824-4715-8cce-cce7c8251299 to store the query checkpoint.
...
-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----+
|timestamp|value|
+---------+-----+
+---------+-----+
...
17/08/11 09:04:17 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8] -> 0),ArrayBuffer(),Map(watermark -> 1970-01-01T00:00:00.000Z))
...
17/08/11 09:04:17 INFO StreamExecution: Streaming query made progress: {
  "id" : "ec8f8228-90f6-4e1f-8ad2-80222affed63",
  "runId" : "f605c134-cfb0-4378-88c1-159d8a7c232e",
  "name" : "rates-to-console",
  "timestamp" : "2017-08-11T07:04:17.373Z",
  "batchId" : 0,
  "numInputRows" : 0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 38,
    "getBatch" : 1,
    "getOffset" : 0,
    "queryPlanning" : 1,
    "triggerExecution" : 62,
    "walCommit" : 19
  },
  "eventTime" : {
    "watermark" : "1970-01-01T00:00:00.000Z"  // <-- no watermark found yet
  },
...
17/08/11 09:04:17 DEBUG StreamExecution: batch 0 committed
...
-------------------------------------------
Batch: 1
-------------------------------------------
+-----------------------+-----+
|timestamp              |value|
+-----------------------+-----+
|2017-08-11 09:04:17.282|0    |
|2017-08-11 09:04:18.282|1    |
+-----------------------+-----+
...
17/08/11 09:04:20 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8] -> 2),ArrayBuffer(),Map(max -> 2017-08-11T07:04:18.282Z, min -> 2017-08-11T07:04:17.282Z, avg -> 2017-08-11T07:04:17.782Z, watermark -> 1970-01-01T00:00:00.000Z))
...
//
// Notice eventTimeStats in eventTime section below
// They are only available when watermark is used and
// EventTimeWatermarkExec.eventTimeStats.value.count > 0, i.e.
// there were input rows (with event time)
// Note that watermark has NOT been changed yet (perhaps it should have)
//
17/08/11 09:04:20 INFO StreamExecution: Streaming query made progress: {
  "id" : "ec8f8228-90f6-4e1f-8ad2-80222affed63",
  "runId" : "f605c134-cfb0-4378-88c1-159d8a7c232e",
  "name" : "rates-to-console",
  "timestamp" : "2017-08-11T07:04:20.004Z",
  "batchId" : 1,
  "numInputRows" : 2,
  "inputRowsPerSecond" : 0.7601672367920943,
  "processedRowsPerSecond" : 25.31645569620253,
  "durationMs" : {
    "addBatch" : 48,
    "getBatch" : 6,
    "getOffset" : 0,
    "queryPlanning" : 1,
    "triggerExecution" : 79,
    "walCommit" : 23
  },
  "eventTime" : {
    "avg" : "2017-08-11T07:04:17.782Z",
    "max" : "2017-08-11T07:04:18.282Z",
    "min" : "2017-08-11T07:04:17.282Z",
    "watermark" : "1970-01-01T00:00:00.000Z"
  },
...
17/08/11 09:04:20 DEBUG StreamExecution: batch 1 committed
...
//
// At long last!
// I think it should have been a batch earlier
// I did ask about it on the dev mailing list today (on 17/08/11)
//
17/08/11 09:04:30 DEBUG StreamExecution: Observed event time stats: EventTimeStats(1502435058282,1502435057282,1.502435057782E12,2)
17/08/11 09:04:30 INFO StreamExecution: Updating eventTime watermark to: 1502435048282 ms
...
-------------------------------------------
Batch: 2
-------------------------------------------
+-----------------------+-----+
|timestamp              |value|
+-----------------------+-----+
|2017-08-11 09:04:19.282|2    |
|2017-08-11 09:04:20.282|3    |
|2017-08-11 09:04:21.282|4    |
|2017-08-11 09:04:22.282|5    |
|2017-08-11 09:04:23.282|6    |
|2017-08-11 09:04:24.282|7    |
|2017-08-11 09:04:25.282|8    |
|2017-08-11 09:04:26.282|9    |
|2017-08-11 09:04:27.282|10   |
|2017-08-11 09:04:28.282|11   |
+-----------------------+-----+
...
17/08/11 09:04:30 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8] -> 10),ArrayBuffer(),Map(max -> 2017-08-11T07:04:28.282Z, min -> 2017-08-11T07:04:19.282Z, avg -> 2017-08-11T07:04:23.782Z, watermark -> 2017-08-11T07:04:08.282Z))
...
17/08/11 09:04:30 INFO StreamExecution: Streaming query made progress: {
  "id" : "ec8f8228-90f6-4e1f-8ad2-80222affed63",
  "runId" : "f605c134-cfb0-4378-88c1-159d8a7c232e",
  "name" : "rates-to-console",
  "timestamp" : "2017-08-11T07:04:30.003Z",
  "batchId" : 2,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 1.000100010001,
  "processedRowsPerSecond" : 56.17977528089888,
  "durationMs" : {
    "addBatch" : 147,
    "getBatch" : 6,
    "getOffset" : 0,
    "queryPlanning" : 1,
    "triggerExecution" : 178,
    "walCommit" : 22
  },
  "eventTime" : {
    "avg" : "2017-08-11T07:04:23.782Z",
    "max" : "2017-08-11T07:04:28.282Z",
    "min" : "2017-08-11T07:04:19.282Z",
    "watermark" : "2017-08-11T07:04:08.282Z"
  },
...
17/08/11 09:04:30 DEBUG StreamExecution: batch 2 committed
...

// In the end, stop the streaming query
sq.stop
Table 1. EventTimeWatermarkExec’s Internal Registries and Counters
Name Description

delayMs

FIXME

Used when…​FIXME

eventTimeStats

EventTimeStatsAccum accumulator to accumulate eventTime values from every row in a streaming batch (when EventTimeWatermarkExec is executed).

Note
EventTimeStatsAccum is a Spark accumulator of EventTimeStats from Longs (i.e. AccumulatorV2[Long, EventTimeStats]).
Note
Every Spark accumulator has to be registered before use, and eventTimeStats is registered when EventTimeWatermarkExec is created.

Executing EventTimeWatermarkExec (And Collecting Event Times) — doExecute Method

doExecute(): RDD[InternalRow]
Note
doExecute is a part of SparkPlan contract to produce the result of a physical operator as an RDD of internal binary rows (i.e. InternalRow).

Internally, doExecute executes child physical operator and maps over the partitions (using RDD.mapPartitions) that does the following:

  1. Creates an unsafe projection for eventTime in the output schema of child physical operator.

  2. For every row (as InternalRow)

Creating EventTimeWatermarkExec Instance

EventTimeWatermarkExec takes the following when created:

  • Name of the eventTime watermark column

  • Delay CalendarInterval

  • Child physical plan

While being created, EventTimeWatermarkExec registers eventTimeStats accumulator (with the current SparkContext).

EventTimeWatermarkExec initializes the internal registries and counters.

results matching ""

    No results matching ""