StreamExecution — Base of Streaming Query Execution Engines

StreamExecution is the base of streaming query execution engines that can run a structured query at regular intervals on a stream execution thread.

Note
Continuous query, streaming query, continuous Dataset, streaming Dataset can all be considered synonyms, and StreamExecution uses analyzed logical plan internally to refer to it.
Table 1. StreamExecution Contract
Property Description

logicalPlan

logicalPlan: LogicalPlan

Analyzed logical plan of the streaming query to execute

Used when StreamExecution is requested to run stream processing

Note
logicalPlan is part of ProgressReporter Contract and the only purpose of the logicalPlan property is to change the access level from protected to public.

runActivatedStream

runActivatedStream(sparkSessionForStream: SparkSession): Unit

Executes the activated streaming query

Used exclusively when StreamExecution is requested to run the streaming query (when transitioning from INITIALIZING to ACTIVE state)

Streaming Query and Stream Execution Engine
scala> :type query
org.apache.spark.sql.streaming.StreamingQuery

import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
val se = query.asInstanceOf[StreamingQueryWrapper].streamingQuery

scala> :type se
org.apache.spark.sql.execution.streaming.StreamExecution

StreamExecution uses the spark.sql.streaming.minBatchesToRetain configuration property to allow the StreamExecutions to discard old log entries (from the offset and commit logs).

Table 2. StreamExecutions
StreamExecution Description

ContinuousExecution

Used in Continuous Stream Processing

MicroBatchExecution

Used in Micro-Batch Stream Processing

Note
StreamExecution does not support adaptive query execution and cost-based optimizer (and turns them off when requested to run stream processing).

StreamExecution is the execution environment of a single streaming query (aka streaming Dataset) that is executed every trigger and in the end adds the results to a sink.

Note
StreamExecution corresponds to a single streaming query with one or more streaming sources and exactly one streaming sink.
import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val q = spark.
  readStream.
  format("rate").
  load.
  writeStream.
  format("console").
  trigger(Trigger.ProcessingTime(10.minutes)).
  start
scala> :type q
org.apache.spark.sql.streaming.StreamingQuery

// Pull out StreamExecution off StreamingQueryWrapper
import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper}
val se = q.asInstanceOf[StreamingQueryWrapper].streamingQuery
scala> :type se
org.apache.spark.sql.execution.streaming.StreamExecution
StreamExecution creating instance.png
Figure 1. Creating Instance of StreamExecution
Note
DataStreamWriter describes how the results of executing batches of a streaming query are written to a streaming sink.

When started, StreamExecution starts a stream execution thread that simply runs stream processing (and hence the streaming query).

StreamExecution start.png
Figure 2. StreamExecution’s Starting Streaming Query (on Execution Thread)

StreamExecution can be in three states:

  • INITIALIZED when the instance was created.

  • ACTIVE when batches are pulled from the sources.

  • TERMINATED when executing streaming batches has been terminated due to an error, all batches were successfully processed or StreamExecution has been stopped.

StreamExecution is a ProgressReporter and reports status of the streaming query (i.e. when it starts, progresses and terminates) by posting StreamingQueryListener events.

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val sq = spark
  .readStream
  .text("server-logs")
  .writeStream
  .format("console")
  .queryName("debug")
  .trigger(Trigger.ProcessingTime(20.seconds))
  .start

// Enable the log level to see the INFO and DEBUG messages
// log4j.logger.org.apache.spark.sql.execution.streaming.StreamExecution=DEBUG

17/06/18 21:21:07 INFO StreamExecution: Starting new streaming query.
17/06/18 21:21:07 DEBUG StreamExecution: getOffset took 5 ms
17/06/18 21:21:07 DEBUG StreamExecution: Stream running from {} to {}
17/06/18 21:21:07 DEBUG StreamExecution: triggerExecution took 9 ms
17/06/18 21:21:07 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(),List(),Map())
17/06/18 21:21:07 INFO StreamExecution: Streaming query made progress: {
  "id" : "8b57b0bd-fc4a-42eb-81a3-777d7ba5e370",
  "runId" : "920b227e-6d02-4a03-a271-c62120258cea",
  "name" : "debug",
  "timestamp" : "2017-06-18T19:21:07.693Z",
  "numInputRows" : 0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 5,
    "triggerExecution" : 9
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "FileStreamSource[file:/Users/jacek/dev/oss/spark/server-logs]",
    "startOffset" : null,
    "endOffset" : null,
    "numInputRows" : 0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@2460208a"
  }
}
17/06/18 21:21:10 DEBUG StreamExecution: Starting Trigger Calculation
17/06/18 21:21:10 DEBUG StreamExecution: getOffset took 3 ms
17/06/18 21:21:10 DEBUG StreamExecution: triggerExecution took 3 ms
17/06/18 21:21:10 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(),List(),Map())

StreamExecution tracks streaming data sources in uniqueSources internal registry.

StreamExecution uniqueSources.png
Figure 3. StreamExecution’s uniqueSources Registry of Streaming Data Sources

StreamExecution collects durationMs for the execution units of streaming batches.

StreamExecution durationMs.png
Figure 4. StreamExecution’s durationMs
scala> :type q
org.apache.spark.sql.streaming.StreamingQuery

scala> println(q.lastProgress)
{
  "id" : "03fc78fc-fe19-408c-a1ae-812d0e28fcee",
  "runId" : "8c247071-afba-40e5-aad2-0e6f45f22488",
  "name" : null,
  "timestamp" : "2017-08-14T20:30:00.004Z",
  "batchId" : 1,
  "numInputRows" : 432,
  "inputRowsPerSecond" : 0.9993568953312452,
  "processedRowsPerSecond" : 1380.1916932907347,
  "durationMs" : {
    "addBatch" : 237,
    "getBatch" : 26,
    "getOffset" : 0,
    "queryPlanning" : 1,
    "triggerExecution" : 313,
    "walCommit" : 45
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8]",
    "startOffset" : 0,
    "endOffset" : 432,
    "numInputRows" : 432,
    "inputRowsPerSecond" : 0.9993568953312452,
    "processedRowsPerSecond" : 1380.1916932907347
  } ],
  "sink" : {
    "description" : "ConsoleSink[numRows=20, truncate=true]"
  }
}

StreamExecution uses OffsetSeqLog and BatchCommitLog metadata logs for write-ahead log (to record offsets to be processed) and that have already been processed and committed to a streaming sink, respectively.

Tip
Monitor offsets and commits metadata logs to know the progress of a streaming query.

StreamExecution delays polling for new data for 10 milliseconds (when no data was available to process in a batch). Use spark.sql.streaming.pollingDelay Spark property to control the delay.

Every StreamExecution is uniquely identified by an ID of the streaming query (which is the id of the StreamMetadata).

Note
Since the StreamMetadata is persisted (to the metadata file in the checkpoint directory), the streaming query ID "survives" query restarts as long as the checkpoint directory is preserved.

StreamExecution is also uniquely identified by a run ID of the streaming query. A run ID is a randomly-generated 128-bit universally unique identifier (UUID) that is assigned at the time StreamExecution is created.

Note
runId does not "survive" query restarts and will always be different yet unique (across all active queries).
Note

The name, id and runId are all unique across all active queries (in a StreamingQueryManager). The difference is that:

  • name is optional and user-defined

  • id is a UUID that is auto-generated at the time StreamExecution is created and persisted to metadata checkpoint file

  • runId is a UUID that is auto-generated every time StreamExecution is created

StreamExecution uses a StreamMetadata that is persisted in the metadata file in the checkpoint directory. If the metadata file is available it is read and is the way to recover the ID of a streaming query when resumed (i.e. restarted after a failure or a planned stop).

StreamExecution uses __is_continuous_processing local property (default: false) to differentiate between ContinuousExecution (true) and MicroBatchExecution (false) which is used when StateStoreRDD is requested to compute a partition (and finds a StateStore for a given version).

Tip

Enable ALL logging level for org.apache.spark.sql.execution.streaming.StreamExecution to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.StreamExecution=ALL

Refer to Logging.

Fully-Qualified (Resolved) Path to Checkpoint Root Directory — resolvedCheckpointRoot Property

resolvedCheckpointRoot: String

resolvedCheckpointRoot is a fully-qualified path of the given checkpoint root directory.

The given checkpoint root directory is defined using checkpointLocation option or the spark.sql.streaming.checkpointLocation configuration property with queryName option.

checkpointLocation and queryName options are defined when StreamingQueryManager is requested to create a streaming query.

resolvedCheckpointRoot is used when creating the path to the checkpoint directory and when StreamExecution finishes running streaming batches.

resolvedCheckpointRoot is used for the logicalPlan (while transforming analyzedPlan and planning StreamingRelation logical operators to corresponding StreamingExecutionRelation physical operators with the streaming data sources created passing in the path to sources directory to store checkpointing metadata).

Tip

You can see resolvedCheckpointRoot in the INFO message when StreamExecution is started.

Starting [prettyIdString]. Use [resolvedCheckpointRoot] to store the query checkpoint.

Internally, resolvedCheckpointRoot creates a Hadoop org.apache.hadoop.fs.Path for checkpointRoot and makes it qualified.

Note
resolvedCheckpointRoot uses SparkSession to access SessionState for a Hadoop configuration.

Offset Commit Log — commits Metadata Checkpoint Directory

StreamExecution uses offset commit log (CommitLog with commits metadata checkpoint directory) for completed streaming batches (with a single file per batch with a file name being the batch id) or committed epochs.

Note
Metadata log or metadata checkpoint are synonyms and are often used interchangeably.

commitLog is used by the extensions for the following:

stopSources Method

stopSources(): Unit

stopSources…​FIXME

Note

stopSources is used when:

Running Stream Processing — runStream Internal Method

runStream(): Unit

runStream simply prepares the environment to execute the activated streaming query.

Note
runStream is used exclusively when the stream execution thread is requested to start (when DataStreamWriter is requested to start an execution of the streaming query).

Internally, runStream sets the job group (to all the Spark jobs started by this thread) as follows:

Note

runStream uses the SparkSession to access SparkContext and assign the job group id.

runStream sets sql.streaming.queryId local property to id.

runStream requests the MetricsSystem to register the MetricsReporter when spark.sql.streaming.metricsEnabled configuration property is on (default: off / false).

runStream notifies StreamingQueryListeners that the streaming query has been started (by posting a new QueryStartedEvent event with id, runId, and name).

StreamingQueryListener onQueryStarted.png
Figure 5. StreamingQueryListener Notified about Query’s Start (onQueryStarted)

runStream unblocks the main starting thread (by decrementing the count of the startLatch that when 0 lets the starting thread continue).

Caution
FIXME A picture with two parallel lanes for the starting thread and daemon one for the query.

runStream updates the status message to be Initializing sources.

runStream initializes the analyzed logical plan.

Note
The analyzed logical plan is a lazy value in Scala and is initialized when requested the very first time.

runStream disables adaptive query execution and cost-based join optimization (by turning spark.sql.adaptive.enabled and spark.sql.cbo.enabled configuration properties off, respectively).

runStream creates a new "zero" OffsetSeqMetadata.

(Only when in INITIALIZING state) runStream enters ACTIVE state:

Note
runBatches does the main work only when first started (i.e. when state is INITIALIZING).

runStream…​FIXME (describe the failed and stop states)

Once TriggerExecutor has finished executing batches, runBatches updates the status message to Stopped.

Note
TriggerExecutor finishes executing batches when batch runner returns whether the streaming query is stopped or not (which is when the internal state is not TERMINATED).
Caution
FIXME Describe catch block for exception handling
Caution
FIXME Describe finally block for query termination

TriggerExecutor’s Batch Runner

Batch Runner (aka batchRunner) is an executable block executed by TriggerExecutor in runBatches.

As long as the query is not stopped (i.e. state is not TERMINATED), batchRunner executes the streaming batch for the trigger.

In triggerExecution time-tracking section, runBatches branches off per currentBatchId.

Table 3. Current Batch Execution per currentBatchId
currentBatchId < 0 currentBatchId >= 0
  1. populateStartOffsets

  2. Setting Job Description as getBatchDescriptionString

DEBUG Stream running from [committedOffsets] to [availableOffsets]

1. Constructing the next streaming micro-batch

If there is data available in the sources, batchRunner marks currentStatus with isDataAvailable enabled.

Note

You can check out the status of a streaming query using status method.

scala> spark.streams.active(0).status
res1: org.apache.spark.sql.streaming.StreamingQueryStatus =
{
  "message" : "Waiting for next trigger",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}

batchRunner then updates the status message to Processing new data and runs the current streaming batch.

StreamExecution runBatches.png
Figure 6. StreamExecution’s Running Batches (on Execution Thread)

After triggerExecution section has finished, batchRunner finishes the streaming batch for the trigger (and collects query execution statistics).

When there was data available in the sources, batchRunner updates committed offsets (by adding the current batch id to BatchCommitLog and adding availableOffsets to committedOffsets).

You should see the following DEBUG message in the logs:

DEBUG batch $currentBatchId committed

batchRunner increments the current batch id and sets the job description for all the following Spark jobs to include the new batch id.

When no data was available in the sources to process, batchRunner does the following:

  1. Marks currentStatus with isDataAvailable disabled

  2. Updates the status message to Waiting for data to arrive

  3. Sleeps the current thread for pollingDelayMs milliseconds.

batchRunner updates the status message to Waiting for next trigger and returns whether the query is currently active or not (so TriggerExecutor can decide whether to finish executing the batches or not)

getBatchDescriptionString Method

getBatchDescriptionString: String

getBatchDescriptionString…​FIXME

Note

getBatchDescriptionString is used when:

  • MicroBatchExecution is requested to runActivatedStream (and sets the job description)

  • StreamExecution is requested to runStream (and sets job group)

Starting Streaming Query (on Stream Execution Thread) — start Method

start(): Unit

When called, start prints out the following INFO message to the logs:

Starting [prettyIdString]. Use [resolvedCheckpointRoot] to store the query checkpoint.

start then starts the stream execution thread (as a daemon thread).

Note
start uses Java’s java.lang.Thread.start to run the streaming query on a separate execution thread.
Note
When started, a streaming query runs in its own execution thread on JVM.

In the end, start pauses the main thread (using the startLatch until StreamExecution is requested to run the streaming query that in turn sends a QueryStartedEvent to all streaming listeners followed by decrementing the count of the startLatch).

Note
start is used exclusively when StreamingQueryManager is requested to start a streaming query (when DataStreamWriter is requested to start an execution of the streaming query).

Creating StreamExecution Instance

StreamExecution takes the following when created:

  • SparkSession

  • Name of the streaming query (can also be null)

  • Path of the checkpoint directory (aka metadata directory)

  • Analyzed logical query plan (i.e. LogicalPlan)

  • Streaming sink

  • Trigger

  • Clock

  • Output mode (that is only used when creating IncrementalExecution for a streaming batch in query planning)

  • deleteCheckpointOnStop flag to control whether to delete the checkpoint directory on stop

StreamExecution initializes the internal properties.

Note
StreamExecution is a Scala abstract class and cannot be created directly. It is created indirectly when the concrete StreamExecutions are.

Path to Checkpoint Directory — checkpointFile Internal Method

checkpointFile(name: String): String

checkpointFile gives the path of a directory with name in checkpoint directory.

Note
checkpointFile uses Hadoop’s org.apache.hadoop.fs.Path.
Note
checkpointFile is used for streamMetadata, OffsetSeqLog, BatchCommitLog, and lastExecution (for runBatch).

Posting StreamingQueryListener Event — postEvent Method

postEvent(event: StreamingQueryListener.Event): Unit
Note
postEvent is a part of ProgressReporter Contract.

postEvent simply requests the StreamingQueryManager to post the input event (to the StreamingQueryListenerBus in the current SparkSession).

Note
postEvent uses SparkSession to access the current StreamingQueryManager.
Note

postEvent is used when:

Waiting Until No Data Available in Sources or Query Has Been Terminated — processAllAvailable Method

processAllAvailable(): Unit
Note
processAllAvailable is a part of StreamingQuery Contract.

processAllAvailable reports streamDeathCause exception if defined (and returns).

Note
streamDeathCause is defined exclusively when StreamExecution runs streaming batches (and terminated with an exception).

processAllAvailable returns when isActive flag is turned off (which is when StreamExecution is in TERMINATED state).

processAllAvailable acquires a lock on awaitProgressLock and turns noNewData flag off.

processAllAvailable keeps waiting 10 seconds for awaitProgressLockCondition until noNewData flag is turned on or StreamExecution is no longer active.

Note
noNewData flag is turned on exclusively when StreamExecution constructs the next streaming micro-batch (and finds that no data is available).

In the end, processAllAvailable releases awaitProgressLock lock.

Stream Execution Thread — queryExecutionThread Property

queryExecutionThread: QueryExecutionThread

queryExecutionThread is a Java thread of execution (java.util.Thread) that runs the structured query when started.

queryExecutionThread uses the name stream execution thread for [id] (that uses prettyIdString for the id, i.e. queryName [id = [id], runId = [runId]]).

queryExecutionThread is a QueryExecutionThread that is really a custom UninterruptibleThread from Apache Spark with runUninterruptibly method for running a block of code without being interrupted by Thread.interrupt()).

queryExecutionThread is started (as a daemon thread) when StreamExecution is requested to start.

When started, queryExecutionThread sets the thread-local properties as the call site and runs the streaming query.

Tip

Use Java’s jconsole or jstack to monitor the streaming threads.

$ jstack <driver-pid> | grep -e "stream execution thread"
"stream execution thread for kafka-topic1 [id =...

toDebugString Internal Method

toDebugString(includeLogicalPlan: Boolean): String

toDebugString…​FIXME

Note
toDebugString is used exclusively when StreamExecution is requested to run stream processing (and an exception is caught).

Current Batch Metadata (Event-Time Watermark and Timestamp) — offsetSeqMetadata Internal Property

offsetSeqMetadata: OffsetSeqMetadata

offsetSeqMetadata is a OffsetSeqMetadata.

Note
offsetSeqMetadata is part of the ProgressReporter Contract to hold the current event-time watermark and timestamp.

offsetSeqMetadata is used to create an IncrementalExecution in the queryPlanning phase of the MicroBatchExecution and ContinuousExecution execution engines.

offsetSeqMetadata is initialized (with 0 for batchWatermarkMs and batchTimestampMs) when StreamExecution is requested to run stream processing.

offsetSeqMetadata is then updated (with the current event-time watermark and timestamp) when MicroBatchExecution is requested to construct the next streaming micro-batch.

Note
MicroBatchExecution uses the WatermarkTracker for the current event-time watermark and the trigger clock for the current batch timestamp.

offsetSeqMetadata is stored (checkpointed) in the walCommit phase of MicroBatchExecution (and printed out as INFO message to the logs).

FIXME INFO message

offsetSeqMetadata is restored (re-created) from a checkpointed state when MicroBatchExecution is requested to populate start offsets.

Internal Properties

Name Description

availableOffsets

StreamProgress that tracks the offsets that are available to be processed, but have not yet be committed to the sink.

Note
availableOffsets is part of the ProgressReporter Contract.
Note
StreamProgress is an enhanced immutable.Map from Scala with streaming sources as keys and their Offsets as values.

Set when (in order):

  1. StreamExecution resumes and populates the start offsets with the latest offsets from the offset log that may have already been processed (and committed to the batch commit log so they are used as the current committed offsets)

  2. StreamExecution constructs the next streaming micro-batch (and gets offsets from the sources)

Note

You can see availableOffsets in DEBUG messages in the logs when StreamExecution resumes and populates the start offsets.

Resuming at batch [currentBatchId] with committed offsets [committedOffsets] and available offsets [availableOffsets]

Used when:

Note
availableOffsets works in tandem with committedOffsets registry.

awaitProgressLock

Java’s fair reentrant mutual exclusion java.util.concurrent.locks.ReentrantLock (that favors granting access to the longest-waiting thread under contention)

awaitProgressLockCondition

callSite

committedOffsets

Committed offsets (StreamProgress of pairs of the streaming sources and the offsets they already processed)

committedOffsets is a part of the ProgressReporter Contract

currentBatchId

Current batch number

initializationLatch

lastExecution

Last IncrementalExecution

newData

newData: Map[BaseStreamingSource, LogicalPlan]

Registry of the streaming sources (in the logical query plan) that have new data available in the current batch. The new data is a streaming DataFrame.

Note
newData is part of the ProgressReporter Contract.

Set exclusively when StreamExecution is requested to requests unprocessed data from streaming sources (while running a single streaming batch).

noNewData

Flag that indicates whether there are new offsets (data) available for processing or not

Turned on (i.e. enabled) when constructing the next streaming micro-batch when no new offsets are available.

offsetLog

Offset write-ahead log (OffsetSeqLog with offsets metadata checkpoint directory) to record offsets in when ready for processing.

Note
Metadata log or metadata checkpoint are synonyms and are often used interchangeably.

Used when StreamExecution populates the start offsets and constructs the next streaming micro-batch (first to store the current batch’s offsets in a write-ahead log and retrieve the previous batch’s offsets right afterwards).

Note
StreamExecution discards offsets from the offset metadata log when the current batch id is above spark.sql.streaming.minBatchesToRetain Spark property (which defaults to 100).

pollingDelayMs

Time delay before polling new data again when no data was available

Set to spark.sql.streaming.pollingDelay Spark property.

Used when StreamExecution has started running streaming batches (and no data was available to process in a trigger).

prettyIdString

Pretty-identified string for identification in logs (with name if defined).

queryName [id = xyz, runId = abc]

[id = xyz, runId = abc]

sources

All streaming Sources in logical query plan (that are the sources from StreamingExecutionRelation).

startLatch

Java’s java.util.concurrent.CountDownLatch with count 1.

Used when StreamExecution is requested to start to pause the main thread until StreamExecution was requested to run the streaming query.

state

Java’s java.util.concurrent.atomic.AtomicReference for the state of a streaming query execution:

  • INITIALIZING (default)

  • ACTIVE (after the first execution of runBatches)

  • TERMINATED

  • RECONFIGURING

streamDeathCause

StreamingQueryException

streamMetrics

MetricsReporter with spark.streaming.[name or id] source name

Uses name if defined (can be null) or falls back to id

uniqueSources

Unique streaming data sources in a streaming Dataset (after being collected as StreamingExecutionRelation from the corresponding logical query plan).

Note
StreamingExecutionRelation is a leaf logical operator (i.e. LogicalPlan) that represents a streaming data source (and corresponds to a single StreamingRelation in analyzed logical query plan of a streaming Dataset).

Used when StreamExecution:

results matching ""

    No results matching ""