ProgressReporter Contract

ProgressReporter is the contract of stream execution progress reporters that report the statistics of a streaming query.

Table 1. ProgressReporter Contract
Method Description

currentBatchId

currentBatchId: Long

Id of the current streaming micro-batch

id

id: UUID

Universally unique identifier (UUID) of the streaming query (that stays unchanged between restarts)

lastExecution

lastExecution: QueryExecution

QueryExecution of the streaming query

logicalPlan

logicalPlan: LogicalPlan

The logical query plan of the streaming query

Used when ProgressReporter is requested for the following:

name

name: String

Name of the streaming query

newData

newData: Map[BaseStreamingSource, LogicalPlan]

Streaming sources with the new data (as a LogicalPlan)

Used when:

offsetSeqMetadata

offsetSeqMetadata: OffsetSeqMetadata

OffsetSeqMetadata (with the current micro-batch event-time watermark and timestamp)

postEvent

postEvent(event: StreamingQueryListener.Event): Unit

runId

runId: UUID

Universally unique identifier (UUID) of the single run of the streaming query (that changes between restarts)

sink

sink: BaseStreamingSink

The one and only streaming sink of the streaming query

sources

sources: Seq[BaseStreamingSource]

Streaming sources of the streaming query

sparkSession

sparkSession: SparkSession

SparkSession of the streaming query

Tip
Read up on SparkSession in The Internals of Spark SQL book.

triggerClock

triggerClock: Clock

Clock of the streaming query

Note
StreamExecution is the one and only known direct extension of the ProgressReporter Contract in Spark Structured Streaming.

ProgressReporter uses the spark.sql.streaming.noDataProgressEventInterval configuration property to control how long to wait between two progress events when there is no data (default: 10000L) when finishing trigger.

ProgressReporter uses yyyy-MM-dd'T'HH:mm:ss.SSS'Z' time format (with UTC timezone).

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val sampleQuery = spark
  .readStream
  .format("rate")
  .load
  .writeStream
  .format("console")
  .option("truncate", false)
  .trigger(Trigger.ProcessingTime(10.seconds))
  .start

// Using public API
import org.apache.spark.sql.streaming.SourceProgress
scala> sampleQuery.
     |   lastProgress.
     |   sources.
     |   map { case sp: SourceProgress =>
     |     s"source = ${sp.description} => endOffset = ${sp.endOffset}" }.
     |   foreach(println)
source = RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8] => endOffset = 663

scala> println(sampleQuery.lastProgress.sources(0))
res40: org.apache.spark.sql.streaming.SourceProgress =
{
  "description" : "RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8]",
  "startOffset" : 333,
  "endOffset" : 343,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.9998000399920015,
  "processedRowsPerSecond" : 200.0
}

// With a hack
import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
val offsets = sampleQuery.
  asInstanceOf[StreamingQueryWrapper].
  streamingQuery.
  availableOffsets.
  map { case (source, offset) =>
    s"source = $source => offset = $offset" }
scala> offsets.foreach(println)
source = RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8] => offset = 293

status Method

status: StreamingQueryStatus

status gives the current StreamingQueryStatus.

Note
status is used when StreamingQueryWrapper is requested for the current status of a streaming query (that is part of StreamingQuery Contract).

Updating Progress — updateProgress Internal Method

updateProgress(newProgress: StreamingQueryProgress): Unit

updateProgress records the input newProgress and posts a QueryProgressEvent event.

ProgressReporter updateProgress.png
Figure 1. ProgressReporter’s Reporting Query Progress

updateProgress adds the input newProgress to progressBuffer.

updateProgress removes elements from progressBuffer if their number is or exceeds the value of spark.sql.streaming.numRecentProgressUpdates property.

updateProgress posts a QueryProgressEvent (with the input newProgress).

updateProgress prints out the following INFO message to the logs:

Streaming query made progress: [newProgress]
Note
updateProgress synchronizes concurrent access to the progressBuffer internal registry.
Note
updateProgress is used exclusively when ProgressReporter finishes a trigger.

Setting State For New Trigger — startTrigger Method

startTrigger(): Unit

startTrigger prints out the following DEBUG message to the logs:

Starting Trigger Calculation
Table 2. startTrigger’s Internal Registry Changes For New Trigger
Registry New Value

lastTriggerStartTimestamp

currentTriggerStartTimestamp

currentTriggerStartTimestamp

Requests the trigger clock for the current timestamp (in millis)

currentStatus

Enables (true) the isTriggerActive flag of the currentStatus

currentTriggerStartOffsets

null

currentTriggerEndOffsets

null

currentDurationsMs

Clears the currentDurationsMs

Note
startTrigger is used exclusively when StreamExecution starts running batches (as part of TriggerExecutor executing a batch runner).

Finishing Trigger (Updating Progress and Marking Current Status As Trigger Inactive) — finishTrigger Method

finishTrigger(hasNewData: Boolean): Unit

Internally, finishTrigger sets currentTriggerEndTimestamp to the current time (using triggerClock).

finishTrigger extractExecutionStats.

finishTrigger calculates the processing time (in seconds) as the difference between the end and start timestamps.

finishTrigger calculates the input time (in seconds) as the difference between the start time of the current and last triggers.

ProgressReporter finishTrigger timestamps.png
Figure 2. ProgressReporter’s finishTrigger and Timestamps

finishTrigger prints out the following DEBUG message to the logs:

Execution stats: [executionStats]

finishTrigger creates a SourceProgress (aka source statistics) for every source used.

finishTrigger creates a SinkProgress (aka sink statistics) for the sink.

finishTrigger creates a StreamingQueryProgress.

If there was any data (using the input hasNewData flag), finishTrigger resets lastNoDataProgressEventTime (i.e. becomes the minimum possible time) and updates query progress.

Otherwise, when no data was available (using the input hasNewData flag), finishTrigger updates query progress only when lastNoDataProgressEventTime passed.

In the end, finishTrigger disables isTriggerActive flag of StreamingQueryStatus (i.e. sets it to false).

Note
finishTrigger is used exclusively when MicroBatchExecution is requested to run the activated streaming query.

Reporting Execution Time — reportTimeTaken Method

reportTimeTaken[T](triggerDetailKey: String)(body: => T): T

reportTimeTaken measures the time to execute body and records it in the currentDurationsMs internal registry.

In the end, reportTimeTaken prints out the following DEBUG message to the logs and returns the result of executing body.

[triggerDetailKey] took [time] ms
Note

reportTimeTaken is used when StreamExecution wants to record the time taken for (as triggerDetailKey in the DEBUG message above):

  • addBatch

  • getBatch

  • getOffset

  • queryPlanning

  • triggerExecution

  • walCommit when writing offsets to log

Updating Status Message — updateStatusMessage Method

updateStatusMessage(message: String): Unit

updateStatusMessage simply updates the message in the StreamingQueryStatus internal registry.

Note

updateStatusMessage is used when:

extractSourceToNumInputRows Internal Method

extractSourceToNumInputRows(): Map[BaseStreamingSource, Long]

extractSourceToNumInputRows…​FIXME

Note
extractSourceToNumInputRows is used exclusively when ProgressReporter is requested to extractExecutionStats.

Extracting Execution Statistics — extractExecutionStats Internal Method

extractExecutionStats(hasNewData: Boolean): ExecutionStats

extractExecutionStats…​FIXME

Note
extractExecutionStats is used exclusively when ProgressReporter is requested to finishTrigger.

extractStateOperatorMetrics Internal Method

extractStateOperatorMetrics(hasNewData: Boolean): Seq[StateOperatorProgress]

extractStateOperatorMetrics…​FIXME

Note
extractStateOperatorMetrics is used exclusively when ProgressReporter is requested to extractExecutionStats.

formatTimestamp Internal Method

formatTimestamp(millis: Long): String

formatTimestamp…​FIXME

Note
formatTimestamp is used when…​FIXME

recordTriggerOffsets Method

recordTriggerOffsets(
  from: StreamProgress,
  to: StreamProgress): Unit

recordTriggerOffsets…​FIXME

Note

recordTriggerOffsets is used when:

lastProgress Method

lastProgress: StreamingQueryProgress

lastProgress…​FIXME

Note
lastProgress is used when…​FIXME

recentProgress Method

recentProgress: Array[StreamingQueryProgress]

recentProgress…​FIXME

Note
recentProgress is used when…​FIXME

Internal Properties

Name Description

currentDurationsMs

scala.collection.mutable.HashMap of action names (aka triggerDetailKey) and their cumulative times (in milliseconds).

The action names can be as follows:

  • addBatch

  • getBatch (when StreamExecution runs a streaming batch)

  • getOffset

  • queryPlanning

  • triggerExecution

  • walCommit when writing offsets to log

Starts empty when ProgressReporter sets the state for a new batch with new entries added or updated when reporting execution time (of an action).

Tip

You can see the current value of currentDurationsMs in progress reports under durationMs.

scala> query.lastProgress.durationMs
res3: java.util.Map[String,Long] = {triggerExecution=60, queryPlanning=1, getBatch=5, getOffset=0, addBatch=30, walCommit=23}

currentStatus

StreamingQueryStatus with the current status of the streaming query

Available using status method

currentTriggerEndOffsets

currentTriggerEndTimestamp

Timestamp of when the current batch/trigger has ended

Default: -1L

currentTriggerStartOffsets

currentTriggerStartTimestamp

Timestamp of when the current batch/trigger has started

Default: -1L

lastNoDataProgressEventTime

Default: Long.MinValue

lastTriggerStartTimestamp

Timestamp of when the last batch/trigger started

Default: -1L

metricWarningLogged

Flag to…​FIXME

Default: false

progressBuffer

Elements are added and removed when ProgressReporter is requested to update progress.

Used when ProgressReporter is requested for the lastProgress and recentProgress

results matching ""

    No results matching ""