StreamExecution — Base of Stream Execution Engines

StreamExecution is the base of stream execution engines (aka streaming query processing engines) that can run a structured query (on a stream execution thread).

Continuous query, streaming query, continuous Dataset, streaming Dataset are all considered high-level synonyms for an executable entity that stream execution engines run using the analyzed logical plan internally.
Table 1. StreamExecution Contract (Abstract Methods Only)
Property Description


logicalPlan: LogicalPlan

Analyzed logical plan of the streaming query to execute

Used when StreamExecution is requested to run stream processing

logicalPlan is part of ProgressReporter Contract and the only purpose of the logicalPlan property is to change the access level from protected to public.


  sparkSessionForStream: SparkSession): Unit

Executes (runs) the activated streaming query

Used exclusively when StreamExecution is requested to run the streaming query (when transitioning from INITIALIZING to ACTIVE state)

Streaming Query and Stream Execution Engine
import org.apache.spark.sql.streaming.StreamingQuery

import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
val se = sq.asInstanceOf[StreamingQueryWrapper].streamingQuery

scala> :type se

StreamExecution uses the spark.sql.streaming.minBatchesToRetain configuration property to allow the StreamExecutions to discard old log entries (from the offset and commit logs).

Table 2. StreamExecutions
StreamExecution Description


Used in Continuous Stream Processing


Used in Micro-Batch Stream Processing

StreamExecution does not support adaptive query execution and cost-based optimizer (and turns them off when requested to run stream processing).

StreamExecution is the execution environment of a single streaming query (aka streaming Dataset) that is executed every trigger and in the end adds the results to a sink.

StreamExecution corresponds to a single streaming query with one or more streaming sources and exactly one streaming sink.
import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val q = spark.
scala> :type q

// Pull out StreamExecution off StreamingQueryWrapper
import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper}
val se = q.asInstanceOf[StreamingQueryWrapper].streamingQuery
scala> :type se
StreamExecution creating instance.png
Figure 1. Creating Instance of StreamExecution
DataStreamWriter describes how the results of executing batches of a streaming query are written to a streaming sink.

When started, StreamExecution starts a stream execution thread that simply runs stream processing (and hence the streaming query).

StreamExecution start.png
Figure 2. StreamExecution’s Starting Streaming Query (on Execution Thread)

StreamExecution is a ProgressReporter and reports status of the streaming query (i.e. when it starts, progresses and terminates) by posting StreamingQueryListener events.

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val sq = spark

// Enable the log level to see the INFO and DEBUG messages

17/06/18 21:21:07 INFO StreamExecution: Starting new streaming query.
17/06/18 21:21:07 DEBUG StreamExecution: getOffset took 5 ms
17/06/18 21:21:07 DEBUG StreamExecution: Stream running from {} to {}
17/06/18 21:21:07 DEBUG StreamExecution: triggerExecution took 9 ms
17/06/18 21:21:07 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(),List(),Map())
17/06/18 21:21:07 INFO StreamExecution: Streaming query made progress: {
  "id" : "8b57b0bd-fc4a-42eb-81a3-777d7ba5e370",
  "runId" : "920b227e-6d02-4a03-a271-c62120258cea",
  "name" : "debug",
  "timestamp" : "2017-06-18T19:21:07.693Z",
  "numInputRows" : 0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 5,
    "triggerExecution" : 9
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "FileStreamSource[file:/Users/jacek/dev/oss/spark/server-logs]",
    "startOffset" : null,
    "endOffset" : null,
    "numInputRows" : 0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@2460208a"
17/06/18 21:21:10 DEBUG StreamExecution: Starting Trigger Calculation
17/06/18 21:21:10 DEBUG StreamExecution: getOffset took 3 ms
17/06/18 21:21:10 DEBUG StreamExecution: triggerExecution took 3 ms
17/06/18 21:21:10 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(),List(),Map())

StreamExecution tracks streaming data sources in uniqueSources internal registry.

StreamExecution uniqueSources.png
Figure 3. StreamExecution’s uniqueSources Registry of Streaming Data Sources

StreamExecution collects durationMs for the execution units of streaming batches.

StreamExecution durationMs.png
Figure 4. StreamExecution’s durationMs
scala> :type q

scala> println(q.lastProgress)
  "id" : "03fc78fc-fe19-408c-a1ae-812d0e28fcee",
  "runId" : "8c247071-afba-40e5-aad2-0e6f45f22488",
  "name" : null,
  "timestamp" : "2017-08-14T20:30:00.004Z",
  "batchId" : 1,
  "numInputRows" : 432,
  "inputRowsPerSecond" : 0.9993568953312452,
  "processedRowsPerSecond" : 1380.1916932907347,
  "durationMs" : {
    "addBatch" : 237,
    "getBatch" : 26,
    "getOffset" : 0,
    "queryPlanning" : 1,
    "triggerExecution" : 313,
    "walCommit" : 45
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8]",
    "startOffset" : 0,
    "endOffset" : 432,
    "numInputRows" : 432,
    "inputRowsPerSecond" : 0.9993568953312452,
    "processedRowsPerSecond" : 1380.1916932907347
  } ],
  "sink" : {
    "description" : "ConsoleSink[numRows=20, truncate=true]"

StreamExecution uses OffsetSeqLog and BatchCommitLog metadata logs for write-ahead log (to record offsets to be processed) and that have already been processed and committed to a streaming sink, respectively.

Monitor offsets and commits metadata logs to know the progress of a streaming query.

StreamExecution delays polling for new data for 10 milliseconds (when no data was available to process in a batch). Use spark.sql.streaming.pollingDelay Spark property to control the delay.

Every StreamExecution is uniquely identified by an ID of the streaming query (which is the id of the StreamMetadata).

Since the StreamMetadata is persisted (to the metadata file in the checkpoint directory), the streaming query ID "survives" query restarts as long as the checkpoint directory is preserved.

StreamExecution is also uniquely identified by a run ID of the streaming query. A run ID is a randomly-generated 128-bit universally unique identifier (UUID) that is assigned at the time StreamExecution is created.

runId does not "survive" query restarts and will always be different yet unique (across all active queries).

The name, id and runId are all unique across all active queries (in a StreamingQueryManager). The difference is that:

  • name is optional and user-defined

  • id is a UUID that is auto-generated at the time StreamExecution is created and persisted to metadata checkpoint file

  • runId is a UUID that is auto-generated every time StreamExecution is created

StreamExecution uses a StreamMetadata that is persisted in the metadata file in the checkpoint directory. If the metadata file is available it is read and is the way to recover the ID of a streaming query when resumed (i.e. restarted after a failure or a planned stop).

StreamExecution uses __is_continuous_processing local property (default: false) to differentiate between ContinuousExecution (true) and MicroBatchExecution (false) which is used when StateStoreRDD is requested to compute a partition (and finds a StateStore for a given version).


Enable ALL logging level for org.apache.spark.sql.execution.streaming.StreamExecution to see what happens inside.

Add the following line to conf/

Refer to Logging.

Creating StreamExecution Instance

StreamExecution takes the following to be created:

  • SparkSession

  • Name of the streaming query (can also be null)

  • Path of the checkpoint directory (aka metadata directory)

  • Streaming query (as an analyzed logical query plan, i.e. LogicalPlan)

  • Streaming sink

  • Trigger

  • Clock

  • Output mode

  • deleteCheckpointOnStop flag (to control whether to delete the checkpoint directory on stop)

StreamExecution initializes the internal properties.

StreamExecution is a Scala abstract class and cannot be created directly. It is created indirectly when the concrete StreamExecutions are.

Write-Ahead Log (WAL) of Offsets — offsetLog Property

offsetLog: OffsetSeqLog

offsetLog is used as Write-Ahead Log of Offsets to persist offsets of the data about to be processed in every trigger.

Metadata log or metadata checkpoint are synonyms and are often used interchangeably.

The number of entries in the OffsetSeqLog is controlled using spark.sql.streaming.minBatchesToRetain configuration property (default: 100). Stream execution engines discard (purge) offsets from the offsets metadata log when the current batch ID (in MicroBatchExecution) or the epoch committed (in ContinuousExecution) is above the threshold.


offsetLog is used when:

State of Streaming Query (Execution) — state Property

state: AtomicReference[State]

state indicates the internal state of execution of the streaming query (as java.util.concurrent.atomic.AtomicReference).

Table 3. States
Name Description


StreamExecution has been requested to run stream processing (and is about to run the activated streaming query)


StreamExecution has been created


Used to indicate that:


Used only when ContinuousExecution is requested to run a streaming query in continuous mode (and the ContinuousReader indicated a need for reconfiguration)

Available Offsets (StreamProgress) — availableOffsets Property

availableOffsets: StreamProgress

availableOffsets is a collection of offsets per streaming source to track what data (by offset) is available for processing for every streaming source in the streaming query (and have not yet been committed).

availableOffsets works in tandem with the committedOffsets internal registry.

availableOffsets is empty when StreamExecution is created (i.e. no offsets are reported for any streaming source in the streaming query).


availableOffsets is used when:

Committed Offsets (StreamProgress) — committedOffsets Property

committedOffsets: StreamProgress

committedOffsets is a collection of offsets per streaming source to track what data (by offset) has already been processed and committed (to the sink or state stores) for every streaming source in the streaming query.

committedOffsets works in tandem with the availableOffsets internal registry.


committedOffsets is used when:

Fully-Qualified (Resolved) Path to Checkpoint Root Directory — resolvedCheckpointRoot Property

resolvedCheckpointRoot: String

resolvedCheckpointRoot is a fully-qualified path of the given checkpoint root directory.

The given checkpoint root directory is defined using checkpointLocation option or the spark.sql.streaming.checkpointLocation configuration property with queryName option.

checkpointLocation and queryName options are defined when StreamingQueryManager is requested to create a streaming query.

resolvedCheckpointRoot is used when creating the path to the checkpoint directory and when StreamExecution finishes running streaming batches.

resolvedCheckpointRoot is used for the logicalPlan (while transforming analyzedPlan and planning StreamingRelation logical operators to corresponding StreamingExecutionRelation physical operators with the streaming data sources created passing in the path to sources directory to store checkpointing metadata).


You can see resolvedCheckpointRoot in the INFO message when StreamExecution is started.

Starting [prettyIdString]. Use [resolvedCheckpointRoot] to store the query checkpoint.

Internally, resolvedCheckpointRoot creates a Hadoop org.apache.hadoop.fs.Path for checkpointRoot and makes it qualified.

resolvedCheckpointRoot uses SparkSession to access SessionState for a Hadoop configuration.

Offset Commit Log — commits Metadata Checkpoint Directory

StreamExecution uses offset commit log (CommitLog with commits metadata checkpoint directory) for streaming batches successfully executed (with a single file per batch with a file name being the batch id) or committed epochs.

Metadata log or metadata checkpoint are synonyms and are often used interchangeably.

commitLog is used by the stream execution engines for the following:

Last Query Execution Of Streaming Query (IncrementalExecution) — lastExecution Property

lastExecution: IncrementalExecution
lastExecution is part of the ProgressReporter Contract for the QueryExecution of a streaming query.

lastExecution is a IncrementalExecution (a QueryExecution of a streaming query) of the most recent (last) execution.

lastExecution is created when the stream execution engines are requested for the following:

lastExecution is used when:

Explaining Streaming Query — explain Method

explain(): Unit (1)
explain(extended: Boolean): Unit
  1. Turns the extended flag off (false)

explain simply prints out explainInternal to the standard output.

explain is used when…​FIXME

explainInternal Method

explainInternal(extended: Boolean): String



explainInternal is used when:

Stopping Streaming Sources and Readers — stopSources Method

stopSources(): Unit

stopSources requests every streaming source (in the streaming query) to stop.

In case of an non-fatal exception, stopSources prints out the following WARN message to the logs:

Failed to stop streaming source: [source]. Resources may have leaked.

stopSources is used when:

Running Stream Processing — runStream Internal Method

runStream(): Unit

runStream simply prepares the environment to execute the activated streaming query.

runStream is used exclusively when the stream execution thread is requested to start (when DataStreamWriter is requested to start an execution of the streaming query).

Internally, runStream sets the job group (to all the Spark jobs started by this thread) as follows:


runStream uses the SparkSession to access SparkContext and assign the job group id.

runStream sets sql.streaming.queryId local property to id.

runStream requests the MetricsSystem to register the MetricsReporter when spark.sql.streaming.metricsEnabled configuration property is on (default: off / false).

runStream notifies StreamingQueryListeners that the streaming query has been started (by posting a new QueryStartedEvent event with id, runId, and name).

StreamingQueryListener onQueryStarted.png
Figure 5. StreamingQueryListener Notified about Query’s Start (onQueryStarted)

runStream unblocks the main starting thread (by decrementing the count of the startLatch that when 0 lets the starting thread continue).

FIXME A picture with two parallel lanes for the starting thread and daemon one for the query.

runStream updates the status message to be Initializing sources.

runStream initializes the analyzed logical plan.

The analyzed logical plan is a lazy value in Scala and is initialized when requested the very first time.

runStream disables adaptive query execution and cost-based join optimization (by turning spark.sql.adaptive.enabled and spark.sql.cbo.enabled configuration properties off, respectively).

runStream creates a new "zero" OffsetSeqMetadata.

(Only when in INITIALIZING state) runStream enters ACTIVE state:

runBatches does the main work only when first started (i.e. when state is INITIALIZING).

runStream…​FIXME (describe the failed and stop states)

Once TriggerExecutor has finished executing batches, runBatches updates the status message to Stopped.

TriggerExecutor finishes executing batches when batch runner returns whether the streaming query is stopped or not (which is when the internal state is not TERMINATED).
FIXME Describe catch block for exception handling

Running Stream Processing — finally Block

runStream releases the startLatch and initializationLatch locks.

runStream stopSources.

runStream sets the state to TERMINATED.

runStream sets the StreamingQueryStatus with the isTriggerActive and isDataAvailable flags off (false).

runStream removes the stream metrics reporter from the application’s MetricsSystem.

runStream creates a new QueryTerminatedEvent (with the id and run id of the streaming query) and posts it.

With the deleteCheckpointOnStop flag enabled and no StreamingQueryException reported, runStream deletes the checkpoint directory recursively.

In the end, runStream releases the terminationLatch lock.

TriggerExecutor’s Batch Runner

Batch Runner (aka batchRunner) is an executable block executed by TriggerExecutor in runBatches.

As long as the query is not stopped (i.e. state is not TERMINATED), batchRunner executes the streaming batch for the trigger.

In triggerExecution time-tracking section, runBatches branches off per currentBatchId.

Table 4. Current Batch Execution per currentBatchId
currentBatchId < 0 currentBatchId >= 0
  1. populateStartOffsets

  2. Setting Job Description as getBatchDescriptionString

DEBUG Stream running from [committedOffsets] to [availableOffsets]

1. Constructing the next streaming micro-batch

If there is data available in the sources, batchRunner marks currentStatus with isDataAvailable enabled.


You can check out the status of a streaming query using status method.

res1: org.apache.spark.sql.streaming.StreamingQueryStatus =
  "message" : "Waiting for next trigger",
  "isDataAvailable" : false,
  "isTriggerActive" : false

batchRunner then updates the status message to Processing new data and runs the current streaming batch.

StreamExecution runBatches.png
Figure 6. StreamExecution’s Running Batches (on Execution Thread)

After triggerExecution section has finished, batchRunner finishes the streaming batch for the trigger (and collects query execution statistics).

When there was data available in the sources, batchRunner updates committed offsets (by adding the current batch id to BatchCommitLog and adding availableOffsets to committedOffsets).

You should see the following DEBUG message in the logs:

DEBUG batch $currentBatchId committed

batchRunner increments the current batch id and sets the job description for all the following Spark jobs to include the new batch id.

When no data was available in the sources to process, batchRunner does the following:

  1. Marks currentStatus with isDataAvailable disabled

  2. Updates the status message to Waiting for data to arrive

  3. Sleeps the current thread for pollingDelayMs milliseconds.

batchRunner updates the status message to Waiting for next trigger and returns whether the query is currently active or not (so TriggerExecutor can decide whether to finish executing the batches or not)

Starting Streaming Query (on Stream Execution Thread) — start Method

start(): Unit

When called, start prints out the following INFO message to the logs:

Starting [prettyIdString]. Use [resolvedCheckpointRoot] to store the query checkpoint.

start then starts the stream execution thread (as a daemon thread).

start uses Java’s java.lang.Thread.start to run the streaming query on a separate execution thread.
When started, a streaming query runs in its own execution thread on JVM.

In the end, start pauses the main thread (using the startLatch until StreamExecution is requested to run the streaming query that in turn sends a QueryStartedEvent to all streaming listeners followed by decrementing the count of the startLatch).

start is used exclusively when StreamingQueryManager is requested to start a streaming query (when DataStreamWriter is requested to start an execution of the streaming query).

Path to Checkpoint Directory — checkpointFile Internal Method

checkpointFile(name: String): String

checkpointFile gives the path of a directory with name in checkpoint directory.

checkpointFile uses Hadoop’s org.apache.hadoop.fs.Path.
checkpointFile is used for streamMetadata, OffsetSeqLog, BatchCommitLog, and lastExecution (for runBatch).

Posting StreamingQueryListener Event — postEvent Method

postEvent(event: StreamingQueryListener.Event): Unit
postEvent is a part of ProgressReporter Contract.

postEvent simply requests the StreamingQueryManager to post the input event (to the StreamingQueryListenerBus in the current SparkSession).

postEvent uses SparkSession to access the current StreamingQueryManager.

postEvent is used when:

Waiting Until No New Data Available in Sources or Query Has Been Terminated — processAllAvailable Method

processAllAvailable(): Unit
processAllAvailable is a part of StreamingQuery Contract.

processAllAvailable reports the StreamingQueryException if reported (and returns immediately).

streamDeathCause is reported exclusively when StreamExecution is requested to run stream execution (that terminated with an exception).

processAllAvailable returns immediately when StreamExecution is no longer active (in TERMINATED state).

processAllAvailable acquires a lock on the awaitProgressLock and turns the noNewData internal flag off (false).

processAllAvailable keeps polling with 10-second pauses (locked on awaitProgressLockCondition) until noNewData flag is turned on (true) or StreamExecution is no longer active (in TERMINATED state).

The 10-second pause is hardcoded and cannot be changed.

In the end, processAllAvailable releases awaitProgressLock lock.

processAllAvailable throws an IllegalStateException when executed on the stream execution thread:

Cannot wait for a query state from the same thread that is running the query

Stream Execution Thread — queryExecutionThread Property

queryExecutionThread: QueryExecutionThread

queryExecutionThread is a Java thread of execution (java.util.Thread) that runs a streaming query.

queryExecutionThread is started (as a daemon thread) when StreamExecution is requested to start. At that time, start prints out the following INFO message to the logs (with the prettyIdString and the resolvedCheckpointRoot):

Starting [prettyIdString]. Use [resolvedCheckpointRoot] to store the query checkpoint.

When started, queryExecutionThread sets the call site and runs the streaming query.

queryExecutionThread uses the name stream execution thread for [id] (that uses prettyIdString for the id, i.e. queryName [id = [id], runId = [runId]]).

queryExecutionThread is a QueryExecutionThread that is a custom UninterruptibleThread from Apache Spark with runUninterruptibly method for running a block of code without being interrupted by Thread.interrupt().


Use Java’s jconsole or jstack to monitor stream execution threads.

$ jstack <driver-pid> | grep -e "stream execution thread"
"stream execution thread for kafka-topic1 [id =...

Internal String Representation — toDebugString Internal Method

toDebugString(includeLogicalPlan: Boolean): String


toDebugString is used exclusively when StreamExecution is requested to run stream processing (and an exception is caught).

Current Batch Metadata (Event-Time Watermark and Timestamp) — offsetSeqMetadata Internal Property

offsetSeqMetadata: OffsetSeqMetadata

offsetSeqMetadata is a OffsetSeqMetadata.

offsetSeqMetadata is part of the ProgressReporter Contract to hold the current event-time watermark and timestamp.

offsetSeqMetadata is used to create an IncrementalExecution in the queryPlanning phase of the MicroBatchExecution and ContinuousExecution execution engines.

offsetSeqMetadata is initialized (with 0 for batchWatermarkMs and batchTimestampMs) when StreamExecution is requested to run stream processing.

offsetSeqMetadata is then updated (with the current event-time watermark and timestamp) when MicroBatchExecution is requested to construct the next streaming micro-batch.

MicroBatchExecution uses the WatermarkTracker for the current event-time watermark and the trigger clock for the current batch timestamp.

offsetSeqMetadata is stored (checkpointed) in walCommit phase of MicroBatchExecution (and printed out as INFO message to the logs).

FIXME INFO message

offsetSeqMetadata is restored (re-created) from a checkpointed state when MicroBatchExecution is requested to populate start offsets.

isActive Method

isActive: Boolean
isActive is part of the StreamingQuery Contract to indicate whether a streaming query is active (true) or not (false).

isActive is enabled (true) as long as the State is not TERMINATED.

exception Method

exception: Option[StreamingQueryException]
exception is part of the StreamingQuery Contract to indicate whether a streaming query…​FIXME


Human-Readable HTML Description of Spark Jobs (for web UI) — getBatchDescriptionString Method

getBatchDescriptionString: String

getBatchDescriptionString is a human-readable description (in HTML format) that uses the optional name if defined, the id, the runId and batchDescription that can be init (for the current batch ID negative) or the current batch ID itself.

getBatchDescriptionString is of the following format:

[name]<br/>id = [id]<br/>runId = [runId]<br/>batch = [batchDescription]
StreamExecution getBatchDescriptionString webUI.png
Figure 7. Monitoring Streaming Query using web UI (Spark Jobs)

getBatchDescriptionString is used when:

  • MicroBatchExecution stream execution engine is requested to run an activated streaming query (as the job description of any Spark jobs triggerred as part of query execution)

  • StreamExecution is requested to run stream processing (as the job group description of any Spark jobs triggerred as part of query execution)

No New Data Available — noNewData Internal Flag

noNewData: Boolean

noNewData is a flag that indicates that a batch has completed with no new data left and processAllAvailable could stop waiting till all streaming data is processed.

Default: false

Turned on (true) when:

Turned off (false) when:

Internal Properties

Name Description


Java’s fair reentrant mutual exclusion java.util.concurrent.locks.ReentrantLock (that favors granting access to the longest-waiting thread under contention)





Current batch ID



newData: Map[BaseStreamingSource, LogicalPlan]

Registry of the streaming sources (in the logical query plan) that have new data available in the current batch. The new data is a streaming DataFrame.

newData is part of the ProgressReporter Contract.

Set exclusively when StreamExecution is requested to requests unprocessed data from streaming sources (while running a single streaming batch).


Time delay before polling new data again when no data was available

Set to spark.sql.streaming.pollingDelay Spark property.

Used when StreamExecution has started running streaming batches (and no data was available to process in a trigger).


Pretty-identified string for identification in logs (with name if defined).

queryName [id = xyz, runId = abc]

[id = xyz, runId = abc]


Java’s java.util.concurrent.CountDownLatch with count 1.

Used when StreamExecution is requested to start to pause the main thread until StreamExecution was requested to run the streaming query.




MetricsReporter with spark.streaming.[name or id] source name

Uses name if defined (can be null) or falls back to id


Unique streaming sources (after being collected as StreamingExecutionRelation from the logical query plan).

StreamingExecutionRelation is a leaf logical operator (i.e. LogicalPlan) that represents a streaming data source (and corresponds to a single StreamingRelation in analyzed logical query plan of a streaming Dataset).

Used when StreamExecution:

results matching ""

    No results matching ""