MicroBatchExecution — Stream Execution Engine of Micro-Batch Stream Processing

MicroBatchExecution is created when StreamingQueryManager is requested to create a streaming query (when DataStreamWriter is requested to start an execution of the streaming query) with the following:

import org.apache.spark.sql.streaming.Trigger
val query = spark
  .readStream
  .format("rate")
  .load
  .writeStream
  .format("console")          // <-- not a StreamWriteSupport sink
  .option("truncate", false)
  .trigger(Trigger.Once)      // <-- Gives MicroBatchExecution
  .queryName("rate2console")
  .start

// The following gives access to the internals
// And to MicroBatchExecution
import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
val engine = query.asInstanceOf[StreamingQueryWrapper].streamingQuery
import org.apache.spark.sql.execution.streaming.StreamExecution
assert(engine.isInstanceOf[StreamExecution])

import org.apache.spark.sql.execution.streaming.MicroBatchExecution
val microBatchEngine = engine.asInstanceOf[MicroBatchExecution]
assert(microBatchEngine.trigger == Trigger.Once)

Once created, MicroBatchExecution (as a stream execution engine) is requested to run an activated streaming query.

Tip

Enable ALL logging level for org.apache.spark.sql.execution.streaming.MicroBatchExecution to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.MicroBatchExecution=ALL

Refer to Logging.

Creating MicroBatchExecution Instance

MicroBatchExecution takes the following to be created:

  • SparkSession

  • Name of the streaming query

  • Path of the checkpoint directory

  • Analyzed logical query plan of the streaming query (LogicalPlan)

  • Streaming sink

  • Trigger

  • Trigger clock (Clock)

  • Output mode

  • Extra options (Map[String, String])

  • deleteCheckpointOnStop flag to control whether to delete the checkpoint directory on stop

MicroBatchExecution initializes the internal properties.

MicroBatchExecution and TriggerExecutor — triggerExecutor Property

triggerExecutor: TriggerExecutor

triggerExecutor is the TriggerExecutor of the streaming query that is how micro-batches are executed at regular intervals.

triggerExecutor is initialized based on the given Trigger (that was used to create the MicroBatchExecution):

triggerExecutor throws an IllegalStateException when the Trigger is not one of the built-in implementations.

Unknown type of trigger: [trigger]
Note
triggerExecutor is used exclusively when StreamExecution is requested to run an activated streaming query (at regular intervals).

Running Activated Streaming Query — runActivatedStream Method

runActivatedStream(
  sparkSessionForStream: SparkSession): Unit
Note
runActivatedStream is part of StreamExecution Contract to run the activated streaming query.

runActivatedStream simply requests the TriggerExecutor to execute micro-batches using the batch runner (until MicroBatchExecution is terminated due to a query stop or a failure).

TriggerExecutor’s Batch Runner

The batch runner (of the TriggerExecutor) is executed as long as the MicroBatchExecution is active.

Note
trigger and batch are considered equivalent and used interchangeably.

The batch runner initializes query progress for the new trigger (aka startTrigger).

The batch runner starts triggerExecution execution phase that is made up of the following steps:

At the start or restart (resume) of a streaming query (when the current batch ID is uninitialized and -1), the batch runner populates start offsets from checkpoint and then prints out the following INFO message to the logs (using the committedOffsets internal registry):

Stream started from [committedOffsets]

The batch runner sets the human-readable description for any Spark job submitted (that streaming sources may submit to get new data) as the batch description.

The batch runner constructs the next streaming micro-batch (when the isCurrentBatchConstructed internal flag is off).

The batch runner records trigger offsets (with the committed and available offsets).

The batch runner updates the current StreamingQueryStatus with the isNewDataAvailable for isDataAvailable property.

With the isCurrentBatchConstructed flag enabled (true), the batch runner updates the status message to one of the following (per isNewDataAvailable) and runs the streaming micro-batch.

Processing new data
No new data but cleaning up state

With the isCurrentBatchConstructed flag disabled (false), the batch runner simply updates the status message to the following:

Waiting for data to arrive

The batch runner finalizes query progress for the trigger (with a flag that indicates whether the current batch had new data).

With the isCurrentBatchConstructed flag enabled (true), the batch runner increments the currentBatchId and turns the isCurrentBatchConstructed flag off (false).

With the isCurrentBatchConstructed flag disabled (false), the batch runner simply sleeps (as long as configured using the spark.sql.streaming.pollingDelay configuration property).

In the end, the batch runner updates the status message to the following status and returns whether the MicroBatchExecution is active or not.

Waiting for next trigger

Populating Start Offsets From Checkpoint (Resuming from Checkpoint) — populateStartOffsets Internal Method

populateStartOffsets(
  sparkSessionToRunBatches: SparkSession): Unit

populateStartOffsets requests the Offset Write-Ahead Log for the latest committed batch id with metadata (i.e. OffsetSeq).

Note
The batch id could not be available in the write-ahead log when a streaming query started with a new log or no batch was persisted (added) to the log before.

populateStartOffsets branches off based on whether the latest committed batch was available or not.

Note
populateStartOffsets is used exclusively when MicroBatchExecution is requested to run an activated streaming query (before the first "zero" micro-batch).

Latest Committed Batch Available

When the latest committed batch id with the metadata was available in the Offset Write-Ahead Log, populateStartOffsets (re)initializes the internal state as follows:

When the latest batch ID found is greater than 0, populateStartOffsets requests the Offset Write-Ahead Log for the second latest batch ID with metadata or throws an IllegalStateException if not found.

batch [latestBatchId - 1] doesn't exist

populateStartOffsets sets the committed offsets to the second latest committed offsets.

populateStartOffsets updates the offset metadata.

Caution
FIXME Describe me

populateStartOffsets requests the Offset Commit Log for the latest committed batch id with metadata (i.e. CommitMetadata).

Caution
FIXME Describe me

When the latest committed batch id with metadata was found which is exactly the latest batch ID (found in the Offset Commit Log), populateStartOffsets…​FIXME

When the latest committed batch id with metadata was found, but it is not exactly the second latest batch ID (found in the Offset Commit Log), populateStartOffsets prints out the following WARN message to the logs:

Batch completion log latest batch id is [latestCommittedBatchId], which is not trailing batchid [latestBatchId] by one

When no commit log present in the Offset Commit Log, populateStartOffsets prints out the following INFO message to the logs:

no commit log present

In the end, populateStartOffsets prints out the following DEBUG message to the logs:

Resuming at batch [currentBatchId] with committed offsets [committedOffsets] and available offsets [availableOffsets]

No Latest Committed Batch

When the latest committed batch id with the metadata could not be found in the Offset Write-Ahead Log, it is assumed that the streaming query is started for the very first time (or the checkpoint location has changed).

populateStartOffsets prints out the following INFO message to the logs:

Starting new streaming query.

populateStartOffsets sets the current batch ID to 0 and creates a new WatermarkTracker.

Constructing Or Skipping Next Streaming Micro-Batch — constructNextBatch Internal Method

constructNextBatch(
  noDataBatchesEnabled: Boolean): Boolean
Note
constructNextBatch will only be executed when the isCurrentBatchConstructed internal flag is enabled (true).

constructNextBatch performs the following steps:

Note
constructNextBatch is used exclusively when MicroBatchExecution is requested to run the activated streaming query.

Requesting Latest Offsets from Streaming Sources (getOffset, setOffsetRange and getEndOffset Phases)

constructNextBatch firstly requests every streaming source for the latest offsets.

Note
constructNextBatch checks out the latest offset in every streaming data source sequentially, i.e. one data source at a time.
MicroBatchExecution constructNextBatch.png
Figure 1. MicroBatchExecution’s Getting Offsets From Streaming Sources

For every streaming source (Data Source API V1), constructNextBatch updates the status message to the following:

Getting offsets from [source]

In getOffset time-tracking section, constructNextBatch requests the Source for the latest offset.

For every MicroBatchReader (Data Source API V2), constructNextBatch updates the status message to the following:

Getting offsets from [source]

In setOffsetRange time-tracking section, constructNextBatch finds the available offsets of the source (in the available offset internal registry) and, if found, requests the MicroBatchReader to deserialize the offset (from JSON format). constructNextBatch requests the MicroBatchReader to set the desired offset range.

In getEndOffset time-tracking section, constructNextBatch requests the MicroBatchReader for the end offset.

Updating availableOffsets StreamProgress with Latest Available Offsets

constructNextBatch updates the availableOffsets StreamProgress with the latest reported offsets.

Updating Batch Metadata with Current Event-Time Watermark and Batch Timestamp

constructNextBatch updates the batch metadata with the current event-time watermark (from the WatermarkTracker) and the batch timestamp.

Checking Whether to Construct Next Micro-Batch or Not (Skip It)

constructNextBatch checks whether or not the next streaming micro-batch should be constructed (lastExecutionRequiresAnotherBatch).

constructNextBatch uses the last IncrementalExecution if the last execution requires another micro-batch (using the batch metadata) and the given noDataBatchesEnabled flag is enabled (true).

Note
shouldConstructNextBatch local flag is enabled (true) when there is new data available (based on offsets) or the last execution requires another micro-batch (and the given noDataBatchesEnabled flag is enabled).

constructNextBatch prints out the following TRACE message to the logs:

noDataBatchesEnabled = [noDataBatchesEnabled], lastExecutionRequiresAnotherBatch = [lastExecutionRequiresAnotherBatch], isNewDataAvailable = [isNewDataAvailable], shouldConstructNextBatch = [shouldConstructNextBatch]

constructNextBatch branches off per whether to constructs or skip the next batch (per shouldConstructNextBatch flag in the above TRACE message).

Constructing Next Micro-Batch — shouldConstructNextBatch Flag Enabled

With the shouldConstructNextBatch flag enabled (true), constructNextBatch updates the status message to the following:

Writing offsets to log

constructNextBatch prints out the following INFO message to the logs:

Committed offsets for batch [currentBatchId]. Metadata [offsetSeqMetadata]
Note
FIXME (if (currentBatchId != 0) …​)
Note
FIXME (if (minLogEntriesToMaintain < currentBatchId) …​)

constructNextBatch turns the noNewData internal flag off (false).

In case of a failure while adding the available offsets to the write-ahead log, constructNextBatch throws an AssertionError:

Concurrent update to the log. Multiple streaming jobs detected for [currentBatchId]

Skipping Next Micro-Batch — shouldConstructNextBatch Flag Disabled

With the shouldConstructNextBatch flag disabled (false), constructNextBatch turns the noNewData flag on (true) and wakes up (notifies) all threads waiting for the awaitProgressLockCondition lock.

Running Single Streaming Micro-Batch — runBatch Internal Method

runBatch(
  sparkSessionToRunBatch: SparkSession): Unit

runBatch prints out the following DEBUG message to the logs (with the current batch ID):

Running batch [currentBatchId]

runBatch then performs the following steps (aka phases):

In the end, runBatch prints out the following DEBUG message to the logs (with the current batch ID):

Completed batch [currentBatchId]
Note
runBatch is used exclusively when MicroBatchExecution is requested to run an activated streaming query (and there is new data to process).

getBatch Phase — Creating Logical Query Plans For Unprocessed Data From Sources and MicroBatchReaders

In getBatch time-tracking section, runBatch goes over the available offsets and processes every Source and MicroBatchReader (associated with the available offsets) to create logical query plans (newData) for data processing (per offset ranges).

Note
runBatch requests sources and readers for data per offset range sequentially, one by one.
StreamExecution runBatch getBatch.png
Figure 2. StreamExecution’s Running Single Streaming Batch (getBatch Phase)

getBatch Phase and Sources

For a Source (with the available offsets different from the committedOffsets registry), runBatch does the following:

runBatch prints out the following DEBUG message to the logs.

Retrieving data from [source]: [current] -> [available]

In the end, runBatch returns the Source and the logical plan of the streaming dataset (for the offset range).

In case the Source returns a dataframe that is not streaming, runBatch throws an AssertionError:

DataFrame returned by getBatch from [source] did not have isStreaming=true\n[logicalQueryPlan]

getBatch Phase and MicroBatchReaders

For a MicroBatchReader (with the available offsets different from the committedOffsets registry), runBatch does the following:

runBatch prints out the following DEBUG message to the logs.

Retrieving data from [reader]: [current] -> [availableV2]

runBatch looks up the DataSourceV2 and the options for the MicroBatchReader (in the readerToDataSourceMap internal registry).

In the end, runBatch requests the MicroBatchReader for the read schema and creates a StreamingDataSourceV2Relation logical operator (with the read schema, the DataSourceV2, options, and the MicroBatchReader).

Transforming Logical Plan to Include Sources and MicroBatchReaders with New Data

StreamExecution runBatch newBatchesPlan.png
Figure 3. StreamExecution’s Running Single Streaming Batch (and Transforming Logical Plan for New Data)

runBatch transforms the analyzed logical plan to include Sources and MicroBatchReaders with new data (newBatchesPlan with logical plans to process data that has arrived since the last batch).

For every StreamingExecutionRelation (with a Source or MicroBatchReader), runBatch tries to find the corresponding logical plan for processing new data.

Note
StreamingExecutionRelation logical operator is used to represent a streaming source or reader in the logical query plan (of a streaming query).

If the logical plan is found, runBatch makes the plan a child operator of Project (with Aliases) logical operator and replaces the StreamingExecutionRelation.

Otherwise, if not found, runBatch simply creates an empty streaming LocalRelation (for scanning data from an empty local collection).

In case the number of columns in dataframes with new data and StreamingExecutionRelation's do not match, runBatch throws an AssertionError:

Invalid batch: [output] != [dataPlan.output]

Transforming CurrentTimestamp and CurrentDate Expressions (Per Batch Metadata)

runBatch replaces all CurrentTimestamp and CurrentDate expressions in the transformed logical plan (with new data) with the current batch timestamp (based on the batch metadata).

Note

CurrentTimestamp and CurrentDate expressions correspond to current_timestamp and current_date standard function, respectively.

Read up The Internals of Spark SQL to learn more about the standard functions.

Adapting Transformed Logical Plan to Sink with StreamWriteSupport

For a StreamWriteSupport (Data Source API V2), runBatch requests the StreamWriteSupport for a StreamWriter (for the runId, the output schema, the OutputMode, and the extra options). runBatch then creates a WriteToDataSourceV2 logical operator with a new MicroBatchWriter as a child operator (for the current batch ID and the StreamWriter).

For a Sink (Data Source API V1), runBatch changes nothing.

For any other BaseStreamingSink type, runBatch simply throws an IllegalArgumentException:

unknown sink type for [sink]

Setting Local Properties

runBatch sets the local properties.

Table 1. runBatch’s Local Properties
Local Property Value

streaming.sql.batchId

__is_continuous_processing

false

queryPlanning Phase — Creating and Preparing IncrementalExecution for Execution

StreamExecution runBatch queryPlanning.png
Figure 4. StreamExecution’s Query Planning (queryPlanning Phase)

In queryPlanning time-tracking section, runBatch creates a new IncrementalExecution with the following:

In the end (of the queryPlanning phase), runBatch requests the IncrementalExecution to prepare the transformed logical plan for execution (i.e. execute the executedPlan query execution phase).

Tip
Read up on the executedPlan query execution phase in The Internals of Spark SQL.

nextBatch Phase — Creating DataFrame (with IncrementalExecution for New Data)

StreamExecution runBatch nextBatch.png
Figure 5. StreamExecution Creates DataFrame with New Data

runBatch creates a new DataFrame with the new IncrementalExecution.

The DataFrame represents the result of executing the current micro-batch of the streaming query.

addBatch Phase — Adding DataFrame With New Data to Sink

StreamExecution runBatch addBatch.png
Figure 6. StreamExecution Adds DataFrame With New Data to Sink

In addBatch time-tracking section, runBatch adds the DataFrame with new data to the BaseStreamingSink.

For a Sink (Data Source API V1), runBatch simply requests the Sink to add the DataFrame (with the batch ID).

For a StreamWriteSupport (Data Source API V2), runBatch simply requests the DataFrame with new data to collect (which simply forces execution of the MicroBatchWriter).

Note
runBatch uses SQLExecution.withNewExecutionId to execute and track all the Spark jobs under one execution id (so it is reported as one single multi-job execution, e.g. in web UI).
Note
SQLExecution.withNewExecutionId posts a SparkListenerSQLExecutionStart event before execution and a SparkListenerSQLExecutionEnd event right afterwards.
Tip

Register SparkListener to get notified about the SQL execution events (SparkListenerSQLExecutionStart and SparkListenerSQLExecutionEnd).

Read up on SparkListener in The Internals of Apache Spark.

Updating Watermark and Committing Offsets to Offset Commit Log

runBatch requests the WatermarkTracker to update event-time watermark (with the executedPlan of the IncrementalExecution).

In the end, runBatch adds the available offsets to the committed offsets (and updates the offsets of every BaseStreamingSource with new data in the current micro-batch).

Stopping Stream Processing (Execution of Streaming Query) — stop Method

stop(): Unit
Note
stop is part of the StreamingQuery Contract to stop a streaming query.

stop sets the state to TERMINATED.

When the stream execution thread is alive, stop requests the current SparkContext to cancelJobGroup identified by the runId and waits for this thread to die. Just to make sure that there are no more streaming jobs, stop requests the current SparkContext to cancelJobGroup identified by the runId again.

In the end, stop prints out the following INFO message to the logs:

Query [prettyIdString] was stopped

Checking Whether New Data Is Available (Based on Available and Committed Offsets) — isNewDataAvailable Internal Method

isNewDataAvailable: Boolean

isNewDataAvailable checks whether there is a streaming source (in the available offsets) for which committed offsets are different from the available offsets or not available (committed) at all.

isNewDataAvailable is positive (true) when there is at least one such streaming source.

Note
isNewDataAvailable is used when MicroBatchExecution is requested to run an activated streaming query and construct the next streaming micro-batch.

Analyzed Logical Plan With Unique StreamingExecutionRelation Operators — logicalPlan Lazy Property

logicalPlan: LogicalPlan
Note
logicalPlan is part of StreamExecution Contract to be the analyzed logical plan of the streaming query.

logicalPlan resolves (replaces) StreamingRelation, StreamingRelationV2 logical operators to StreamingExecutionRelation logical operators. logicalPlan uses the transformed logical plan to set the uniqueSources and sources internal registries to be the BaseStreamingSources of all the StreamingExecutionRelations unique and not, respectively.

Note
logicalPlan is a Scala lazy value and so the initialization is guaranteed to happen only once at the first access (and is cached for later use afterwards).

Internally, logicalPlan transforms the analyzed logical plan.

For every StreamingRelation logical operator, logicalPlan tries to replace it with the StreamingExecutionRelation that was used earlier for the same StreamingRelation (if used multiple times in the plan) or creates a new one. While creating a new StreamingExecutionRelation, logicalPlan requests the DataSource to create a streaming Source with the metadata path as sources/uniqueID directory in the checkpoint root directory. logicalPlan prints out the following INFO message to the logs:

Using Source [source] from DataSourceV1 named '[sourceName]' [dataSourceV1]

For every StreamingRelationV2 logical operator with a MicroBatchReadSupport data source (which is not on the list of spark.sql.streaming.disabledV2MicroBatchReaders), logicalPlan tries to replace it with the StreamingExecutionRelation that was used earlier for the same StreamingRelationV2 (if used multiple times in the plan) or creates a new one. While creating a new StreamingExecutionRelation, logicalPlan requests the MicroBatchReadSupport to create a MicroBatchReader with the metadata path as sources/uniqueID directory in the checkpoint root directory. logicalPlan prints out the following INFO message to the logs:

Using MicroBatchReader [reader] from DataSourceV2 named '[sourceName]' [dataSourceV2]

For every other StreamingRelationV2 logical operator, logicalPlan tries to replace it with the StreamingExecutionRelation that was used earlier for the same StreamingRelationV2 (if used multiple times in the plan) or creates a new one. While creating a new StreamingExecutionRelation, logicalPlan requests the StreamingRelation for the underlying DataSource that is in turn requested to create a streaming Source with the metadata path as sources/uniqueID directory in the checkpoint root directory. logicalPlan prints out the following INFO message to the logs:

Using Source [source] from DataSourceV2 named '[sourceName]' [dataSourceV2]

logicalPlan requests the transformed analyzed logical plan for all StreamingExecutionRelations that are then requested for BaseStreamingSources, and saves them as the sources internal registry.

In the end, logicalPlan sets the uniqueSources internal registry to be the unique BaseStreamingSources above.

logicalPlan throws an AssertionError when not executed on the stream execution thread.

logicalPlan must be initialized in QueryExecutionThread but the current thread was [currentThread]

streaming.sql.batchId Local Property

MicroBatchExecution defines streaming.sql.batchId as the name of the local property to be the current batch or epoch IDs (that Spark tasks can use)

streaming.sql.batchId is used when:

  • MicroBatchExecution is requested to run a single streaming micro-batch (and sets the property to be the current batch ID)

  • DataWritingSparkTask is requested to run (and needs an epoch ID)

Internal Properties

Name Description

isCurrentBatchConstructed

Flag to control whether to run a streaming micro-batch (true) or not (false)

Default: false

readerToDataSourceMap

(Map[MicroBatchReader, (DataSourceV2, Map[String, String])])

sources

Default: (empty)

Note
sources is part of the ProgressReporter Contract for the streaming sources of the streaming query.

Used when:

watermarkTracker

WatermarkTracker that is created when MicroBatchExecution is requested to populate start offsets (when requested to run an activated streaming query)

results matching ""

    No results matching ""