ProgressReporter Contract

ProgressReporter is the contract of stream execution progress reporters that report the statistics of execution of a streaming query.

Table 1. ProgressReporter Contract
Method Description


currentBatchId: Long

Id of the current streaming micro-batch


id: UUID

Universally unique identifier (UUID) of the streaming query (that stays unchanged between restarts)


lastExecution: QueryExecution

QueryExecution of the streaming query


logicalPlan: LogicalPlan

Logical query plan of the streaming query

Used when ProgressReporter is requested for the following:


name: String

Name of the streaming query


newData: Map[BaseStreamingSource, LogicalPlan]

Streaming readers and sources with the new data (as a LogicalPlan)

Used when:


offsetSeqMetadata: OffsetSeqMetadata

OffsetSeqMetadata (with the current micro-batch event-time watermark and timestamp)


postEvent(event: StreamingQueryListener.Event): Unit


runId: UUID

Universally unique identifier (UUID) of the single run of the streaming query (that changes every restart)


sink: BaseStreamingSink

The one and only streaming writer or sink of the streaming query



sparkSession: SparkSession

SparkSession of the streaming query

Read up on SparkSession in The Internals of Spark SQL book.


triggerClock: Clock

Clock of the streaming query

StreamExecution is the one and only known direct extension of the ProgressReporter Contract in Spark Structured Streaming.

ProgressReporter uses the spark.sql.streaming.noDataProgressEventInterval configuration property to control how long to wait between two progress events when there is no data (default: 10000L) when finishing trigger.

ProgressReporter uses yyyy-MM-dd'T'HH:mm:ss.SSS'Z' time format (with UTC timezone).

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val sampleQuery = spark
  .option("truncate", false)

// Using public API
import org.apache.spark.sql.streaming.SourceProgress
scala> sampleQuery.
     |   lastProgress.
     |   sources.
     |   map { case sp: SourceProgress =>
     |     s"source = ${sp.description} => endOffset = ${sp.endOffset}" }.
     |   foreach(println)
source = RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8] => endOffset = 663

scala> println(sampleQuery.lastProgress.sources(0))
res40: org.apache.spark.sql.streaming.SourceProgress =
  "description" : "RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8]",
  "startOffset" : 333,
  "endOffset" : 343,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.9998000399920015,
  "processedRowsPerSecond" : 200.0

// With a hack
import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
val offsets = sampleQuery.
  map { case (source, offset) =>
    s"source = $source => offset = $offset" }
scala> offsets.foreach(println)
source = RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8] => offset = 293

Configure logging of the concrete stream execution progress reporters to see what happens inside a ProgressReporter:

progressBuffer Internal Property

progressBuffer: Queue[StreamingQueryProgress]

progressBuffer has a new StreamingQueryProgress added when ProgressReporter is requested to update progress of a streaming query.

When the size (the number of StreamingQueryProgresses) is above spark.sql.streaming.numRecentProgressUpdates threshold, the oldest StreamingQueryProgress is removed (dequeued).

progressBuffer is used when ProgressReporter is requested for the last and the recent StreamingQueryProgresses

status Method

status: StreamingQueryStatus

status gives the current StreamingQueryStatus.

status is used when StreamingQueryWrapper is requested for the current status of a streaming query (that is part of StreamingQuery Contract).

Updating Progress of Streaming Query — updateProgress Internal Method

updateProgress(newProgress: StreamingQueryProgress): Unit

updateProgress records the input newProgress and posts a QueryProgressEvent event.

ProgressReporter updateProgress.png
Figure 1. ProgressReporter’s Reporting Query Progress

updateProgress adds the input newProgress to progressBuffer.

updateProgress removes elements from progressBuffer if their number is or exceeds the value of spark.sql.streaming.numRecentProgressUpdates property.

updateProgress posts a QueryProgressEvent (with the input newProgress).

updateProgress prints out the following INFO message to the logs:

Streaming query made progress: [newProgress]
updateProgress synchronizes concurrent access to the progressBuffer internal registry.
updateProgress is used exclusively when ProgressReporter is requested to finish up a trigger.

Initializing Query Progress for New Trigger — startTrigger Method

startTrigger(): Unit

startTrigger prints out the following DEBUG message to the logs:

Starting Trigger Calculation
Table 2. startTrigger’s Internal Registry Changes For New Trigger
Registry New Value




Requests the trigger clock for the current timestamp (in millis)


Enables (true) the isTriggerActive flag of the currentStatus






Clears the currentDurationsMs


startTrigger is used when:

StreamExecution starts running batches (as part of TriggerExecutor executing a batch runner).

Finishing Up Streaming Batch (Trigger) and Generating StreamingQueryProgress — finishTrigger Method

finishTrigger(hasNewData: Boolean): Unit

Internally, finishTrigger sets currentTriggerEndTimestamp to the current time (using triggerClock).

finishTrigger extractExecutionStats.

finishTrigger calculates the processing time (in seconds) as the difference between the end and start timestamps.

finishTrigger calculates the input time (in seconds) as the difference between the start time of the current and last triggers.

ProgressReporter finishTrigger timestamps.png
Figure 2. ProgressReporter’s finishTrigger and Timestamps

finishTrigger prints out the following DEBUG message to the logs:

Execution stats: [executionStats]

finishTrigger creates a SourceProgress (aka source statistics) for every source used.

finishTrigger creates a SinkProgress (aka sink statistics) for the sink.

finishTrigger creates a StreamingQueryProgress.

If there was any data (using the input hasNewData flag), finishTrigger resets lastNoDataProgressEventTime (i.e. becomes the minimum possible time) and updates query progress.

Otherwise, when no data was available (using the input hasNewData flag), finishTrigger updates query progress only when lastNoDataProgressEventTime passed.

In the end, finishTrigger disables isTriggerActive flag of StreamingQueryStatus (i.e. sets it to false).

finishTrigger is used exclusively when MicroBatchExecution is requested to run the activated streaming query (after triggerExecution Phase at the end of a streaming batch).

Time-Tracking Section (Recording Execution Time for Progress Reporting) — reportTimeTaken Method

  triggerDetailKey: String)(
  body: => T): T

reportTimeTaken measures the time to execute body and records it in the currentDurationsMs internal registry under triggerDetailKey key. If the triggerDetailKey key was recorded already, the current execution time is added.

In the end, reportTimeTaken prints out the following DEBUG message to the logs and returns the result of executing body.

[triggerDetailKey] took [time] ms

reportTimeTaken is used when the stream execution engines are requested to execute the following phases (that appear as triggerDetailKey in the DEBUG message in the logs):

Updating Status Message — updateStatusMessage Method

updateStatusMessage(message: String): Unit

updateStatusMessage simply updates the message in the StreamingQueryStatus internal registry.


updateStatusMessage is used when:

Generating Execution Statistics — extractExecutionStats Internal Method

extractExecutionStats(hasNewData: Boolean): ExecutionStats

extractExecutionStats generates an ExecutionStats of the last execution of the streaming query.

Internally, extractExecutionStats generate watermark metric (using the event-time watermark of the OffsetSeqMetadata) if there is a EventTimeWatermark unary logical operator in the logical plan of the streaming query.

EventTimeWatermark unary logical operator represents Dataset.withWatermark operator in a streaming query.

extractExecutionStats extractStateOperatorMetrics.

extractExecutionStats extractSourceToNumInputRows.

extractExecutionStats finds the EventTimeWatermarkExec unary physical operator (with non-zero EventTimeStats) and generates max, min, and avg statistics.

In the end, extractExecutionStats creates a ExecutionStats with the execution statistics.

If the input hasNewData flag is turned off (false), extractExecutionStats returns an ExecutionStats with no input rows and event-time statistics (that require data to be processed to have any sense).

extractExecutionStats is used exclusively when ProgressReporter is requested to finish up a streaming batch (trigger) and generate a StreamingQueryProgress.

Generating StateStoreWriter Metrics (StateOperatorProgress) — extractStateOperatorMetrics Internal Method

  hasNewData: Boolean): Seq[StateOperatorProgress]

extractStateOperatorMetrics requests the QueryExecution for the optimized execution plan (executedPlan) and finds all StateStoreWriter physical operators and requests them for StateOperatorProgress.

extractStateOperatorMetrics clears (zeros) the numRowsUpdated metric for the given hasNewData turned off (false).

extractStateOperatorMetrics returns an empty collection for the QueryExecution uninitialized (null).

extractStateOperatorMetrics is used exclusively when ProgressReporter is requested to generate execution statistics.

extractSourceToNumInputRows Internal Method

extractSourceToNumInputRows(): Map[BaseStreamingSource, Long]


extractSourceToNumInputRows is used exclusively when ProgressReporter is requested to generate execution statistics.

formatTimestamp Internal Method

formatTimestamp(millis: Long): String


formatTimestamp is used when…​FIXME

Recording Trigger Offsets (StreamProgress) — recordTriggerOffsets Method

  from: StreamProgress,
  to: StreamProgress): Unit

recordTriggerOffsets simply sets (records) the currentTriggerStartOffsets and currentTriggerEndOffsets internal registries to the json representations of the from and to StreamProgresses.


recordTriggerOffsets is used when:

Last StreamingQueryProgress — lastProgress Method

lastProgress: StreamingQueryProgress


lastProgress is used when…​FIXME

recentProgress Method

recentProgress: Array[StreamingQueryProgress]


recentProgress is used when…​FIXME

Internal Properties

Name Description


scala.collection.mutable.HashMap of action names (aka triggerDetailKey) and their cumulative times (in milliseconds).

Starts empty when ProgressReporter sets the state for a new batch with new entries added or updated when reporting execution time (of an action).


You can see the current value of currentDurationsMs in progress reports under durationMs.

scala> query.lastProgress.durationMs
res3: java.util.Map[String,Long] = {triggerExecution=60, queryPlanning=1, getBatch=5, getOffset=0, addBatch=30, walCommit=23}


StreamingQueryStatus with the current status of the streaming query

Available using status method



Timestamp of when the current batch/trigger has ended

Default: -1L


currentTriggerStartOffsets: Map[BaseStreamingSource, String]

Start offsets (in JSON format) per source


Timestamp of when the current batch/trigger has started

Default: -1L


Default: Long.MinValue


Timestamp of when the last batch/trigger started

Default: -1L


Flag to…​FIXME

Default: false

results matching ""

    No results matching ""