MicroBatchExecution — StreamExecution in Micro-Batch Stream Processing

MicroBatchExecution is a StreamExecution in Micro-Batch Stream Processing.

MicroBatchExecution is created when StreamingQueryManager is requested to create a streaming query with any type of sink but StreamWriteSupport and any type of trigger but ContinuousTrigger (when DataStreamWriter is requested to start an execution of the streaming query).

import org.apache.spark.sql.streaming.Trigger
val sq = spark
  .readStream
  .format("rate")
  .load
  .writeStream
  .format("console")
  .option("truncate", false)
  .trigger(Trigger.Once) // <-- Gives MicroBatchExecution
  .queryName("rate2console")
  .start

import org.apache.spark.sql.streaming.StreamingQuery
assert(sq.isInstanceOf[StreamingQuery])

// The following gives access to the internals
// And to MicroBatchExecution
import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
val engine = sq.asInstanceOf[StreamingQueryWrapper].streamingQuery
import org.apache.spark.sql.execution.streaming.StreamExecution
assert(engine.isInstanceOf[StreamExecution])

import org.apache.spark.sql.execution.streaming.MicroBatchExecution
val microBatchEngine = engine.asInstanceOf[MicroBatchExecution]
assert(microBatchEngine.trigger == Trigger.Once)
Table 1. MicroBatchExecution’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

sources

Collection of BaseStreamingSource instances

Used when…​FIXME

triggerExecutor

  • ProcessingTimeExecutor for ProcessingTime

  • OneTimeExecutor for OneTimeTrigger (aka Once trigger)

Used when StreamExecution starts running streaming batches.

Note
StreamExecution throws a IllegalStateException when the Trigger is not one of the two built-in implementations: OneTimeExecutor or ProcessingTimeExecutor.

Populating Start Offsets — populateStartOffsets Internal Method

populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit
Note
The batch id could not be available in metadata log if a streaming query started with a new metadata log or no batch was committed before.

With the latest committed batch id with the metadata (from OffsetSeqLog) populateStartOffsets sets current batch id to the latest committed batch id, and availableOffsets to its offsets (considering them unprocessed yet).

Note
populateStartOffsets may re-execute the latest committed batch.

If the latest batch id is greater than 0, populateStartOffsets requests OffsetSeqLog for the second latest batch with its metadata (or reports a IllegalStateException if not found). populateStartOffsets sets committed offsets to the second latest committed offsets.

populateStartOffsets updates the offset metadata.

Caution
FIXME Why is the update needed?

(only when the latest batch in OffsetSeqLog is also the latest batch in BatchCommitLog) With the latest processed batch id with the metadata (from BatchCommitLog), populateStartOffsets sets current batch id as the next after the latest processed batch. populateStartOffsets sets committed offsets to availableOffsets.

Caution
FIXME Describe what happens with availableOffsets.

populateStartOffsets constructs the next streaming batch.

Caution
FIXME Describe the WARN message when latestCommittedBatchId < latestBatchId - 1.
WARN Batch completion log latest batch id is [latestCommittedBatchId], which is not trailing batchid [latestBatchId] by one

You should see the following DEBUG message in the logs:

DEBUG Resuming at batch [currentBatchId] with committed offsets [committedOffsets] and available offsets [availableOffsets]
Caution
FIXME Include an example of Resuming at batch

When the latest committed batch id with the metadata could not be found in BatchCommitLog, populateStartOffsets prints out the following INFO message to the logs:

INFO no commit log present
Caution
FIXME Include an example of the case when no commit log present.

When the latest committed batch id with the metadata could not be found in OffsetSeqLog, it is assumed that the streaming query is started for the first time. You should see the following INFO message in the logs:

INFO StreamExecution: Starting new streaming query.

populateStartOffsets sets current batch id to 0 and constructs the next streaming batch.

Note
populateStartOffsets is used exclusively when MicroBatchExecution is requested to runActivatedStream.

Running Activated Streaming Query — runActivatedStream Method

runActivatedStream(sparkSessionForStream: SparkSession): Unit
Note
runActivatedStream is part of StreamExecution Contract to run the activated streaming query.

runActivatedStream…​FIXME

Creating MicroBatchExecution Instance

MicroBatchExecution takes the following when created:

  • SparkSession

  • Query name

  • Path to the checkpoint directory (aka metadata directory)

  • Analyzed logical query plan (i.e. LogicalPlan)

  • Streaming sink

  • Trigger

  • Clock

  • Output mode (that is only used when creating IncrementalExecution for a streaming batch in query planning)

  • Map[String, String]

  • deleteCheckpointOnStop flag to control whether to delete the checkpoint directory on stop

MicroBatchExecution initializes the internal registries and counters.

Initializing Analyzed Logical Plan of Streaming Query — logicalPlan Property

logicalPlan: LogicalPlan
Note
logicalPlan is part of StreamExecution Contract to initialize the analyzed logical plan of the streaming query.

logicalPlan resolves (replaces) StreamingRelation, StreamingRelationV2 logical operators to StreamingExecutionRelation logical operators.

Note
logicalPlan is a Scala lazy value and so the resolution happens only once at the first access and is cached for later use afterwards.

Internally, logicalPlan…​FIXME

Constructing Next Streaming Batch — constructNextBatch Internal Method

constructNextBatch(): Unit

constructNextBatch is made up of the following three parts:

  1. Firstly, checking if there is new data available by requesting new offsets from every streaming source

  2. There is some data to process (and so the next batch is constructed)

  3. No data is available

Note
constructNextBatch is used exclusively when MicroBatchExecution is requested to run the activated streaming query.

Checking Whether New Data Is Available (by Requesting New Offsets from Sources)

constructNextBatch starts by checking whether or not a new data is available in any of the streaming sources (in the logical query plan).

Note
constructNextBatch checks out the latest offset in every streaming data source sequentially, i.e. one data source at a time.
StreamExecution constructNextBatch.png
Figure 1. StreamExecution’s Getting Offsets From Streaming Sources
Note
constructNextBatch uses the Source contract to get the latest offset (using Source.getOffset method).

constructNextBatch updates the status message to Getting offsets from [source] for every streaming data source.

In getOffset time-tracking section, constructNextBatch gets the offsets.

constructNextBatch prints out the following DEBUG message to the logs:

DEBUG StreamExecution: getOffset took [time] ms

constructNextBatch adds the streaming sources that have the available offsets to availableOffsets.

If there is no data available (i.e. no offsets unprocessed in any of the streaming data sources), constructNextBatch turns noNewData flag on.

In the end (of this checking-data block), constructNextBatch releases awaitProgressLock

New Data Available

When new data is available, constructNextBatch updates the event time watermark (tracked using offsetSeqMetadata) if it finds one in the last IncrementalExecution.

If lastExecution is available (which may not when constructNextBatch is executed the very first time), constructNextBatch takes the executed physical plan (i.e. SparkPlan) and collects all EventTimeWatermarkExec physical operators with the count of eventTimeStats greater than 0.

Note
The executed physical plan is available as executedPlan property of IncrementalExecution (which is a custom QueryExecution).

You should see the following DEBUG message in the logs:

DEBUG StreamExecution: Observed event time stats: [eventTimeStats]

constructNextBatch calculates the difference between the maximum value of eventTimeStats and delayMs for every EventTimeWatermarkExec physical operator.

Note
The maximum value of eventTimeStats is the youngest time, i.e. the time the closest to the current time.

constructNextBatch then takes the first difference (if available at all) and uses it as a possible new event time watermark.

If the event time watermark candidate is greater than the current watermark (i.e. later time-wise), constructNextBatch prints out the following INFO message to the logs:

INFO StreamExecution: Updating eventTime watermark to: [newWatermarkMs] ms

constructNextBatch creates a new OffsetSeqMetadata with the new event time watermark and the current time.

Otherwise, if the eventTime watermark candidate is not greater than the current watermark, constructNextBatch simply prints out the following DEBUG message to the logs:

DEBUG StreamExecution: Event time didn't move: [newWatermarkMs] <= [batchWatermarkMs]

constructNextBatch creates a new OffsetSeqMetadata with just the current time.

Note
Although constructNextBatch collects all the EventTimeWatermarkExec physical operators in the executed physical plan of lastExecution, only the first matters if available.
Note
A physical plan can have as many EventTimeWatermarkExec physical operators as withWatermark operator was used to create a streaming query.
Note

Streaming watermark can be changed between a streaming query’s restarts (and be different between what is checkpointed and the current version of the query).

FIXME True? Example?

constructNextBatch then adds the offsets to metadata log.

constructNextBatch updates the status message to Writing offsets to log.

Note

While writing the offsets to the metadata log, constructNextBatch uses the following internal registries:

constructNextBatch reports a AssertionError when writing to the metadata log has failed.

Concurrent update to the log. Multiple streaming jobs detected for [currentBatchId]
Tip

Use StreamingQuery.lastProgress to access walCommit duration.

scala> :type sq
org.apache.spark.sql.streaming.StreamingQuery
sq.lastProgress.durationMs.get("walCommit")
Tip

Enable INFO logging level for org.apache.spark.sql.execution.streaming.StreamExecution logger to be notified about walCommit duration.

17/08/11 09:04:17 INFO StreamExecution: Streaming query made progress: {
  "id" : "ec8f8228-90f6-4e1f-8ad2-80222affed63",
  "runId" : "f605c134-cfb0-4378-88c1-159d8a7c232e",
  "name" : "rates-to-console",
  "timestamp" : "2017-08-11T07:04:17.373Z",
  "batchId" : 0,
  "numInputRows" : 0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 38,
    "getBatch" : 1,
    "getOffset" : 0,
    "queryPlanning" : 1,
    "triggerExecution" : 62,
    "walCommit" : 19          // <-- walCommit
  },

constructNextBatch commits the offsets for the batch (only when current batch id is not 0, i.e. when the query has just been started and constructNextBatch is called the first time).

constructNextBatch takes the previously-committed batch (from OffsetSeqLog), extracts the stored offsets per source.

Note
constructNextBatch uses OffsetSeq.toStreamProgress and sources registry to extract the offsets per source.

constructNextBatch requests every streaming source to commit the offsets

Note
constructNextBatch uses the Source contract to commit the offsets (using Source.commit method).

constructNextBatch reports a IllegalStateException when current batch id is 0.

batch [currentBatchId] doesn't exist

In the end, constructNextBatch purges OffsetSeqLog and BatchCommitLog when current batch id is above spark.sql.streaming.minBatchesToRetain Spark property.

No New Data Available

If there is no new data available, constructNextBatch acquires a lock on awaitProgressLock, wakes up all waiting threads that are waiting for the stream to progress (using awaitProgressLockCondition), followed by releasing the lock on awaitProgressLock.

Checking Whether Data Is Available in Streaming Sources — dataAvailable Internal Method

dataAvailable: Boolean

dataAvailable finds the streaming sources in availableOffsets for which the offsets committed (as recorded in committedOffsets) are different or do not exist at all.

If there are any differences in the number of sources or their committed offsets, dataAvailable is enabled (i.e. true).

Note
dataAvailable is used when StreamExecution runs streaming batches and constructs the next streaming batch.

Running Single Streaming Batch — runBatch Internal Method

runBatch(sparkSessionToRunBatch: SparkSession): Unit

runBatch performs the following steps (aka phases):

Note
runBatch is used exclusively when StreamExecution runs streaming batches.

getBatch Phase — Requesting New (and Hence Unprocessed) Data From Streaming Sources

Internally, runBatch first requests the streaming sources for unprocessed data (and stores them as DataFrames in newData internal registry).

In getBatch time-tracking section, runBatch goes over the available offsets per source and processes the offsets that have not been committed yet.

runBatch then requests every source for the data (as DataFrame with the new records).

Note
runBatch requests the streaming sources for new DataFrames sequentially, source by source.
StreamExecution runBatch getBatch.png
Figure 2. StreamExecution’s Running Single Streaming Batch (getBatch Phase)

You should see the following DEBUG message in the logs:

DEBUG StreamExecution: Retrieving data from [source]: [current] -> [available]

You should then see the following DEBUG message in the logs:

DEBUG StreamExecution: getBatch took [timeTaken] ms

withNewSources Phase — Replacing StreamingExecutionRelations (in Logical Plan) With Relations With New Data or Empty LocalRelation

StreamExecution runBatch withNewSources.png
Figure 3. StreamExecution’s Running Single Streaming Batch (withNewSources Phase)

In withNewSources phase, runBatch transforms logical query plan and replaces every StreamingExecutionRelation logical operator with the logical plan of the DataFrame with the input data in a batch for the corresponding streaming source.

Note
StreamingExecutionRelation logical operator is used to represent a streaming source in the logical query plan of a streaming Dataset.

runBatch finds the corresponding DataFrame (with the input data) per streaming source in newData internal registry. If found, runBatch takes the logical plan of the DataFrame. If not, runBatch creates a LocalRelation logical relation (for the output schema).

Note
newData internal registry contains entries for streaming sources that have new data available in the current batch.

While replacing StreamingExecutionRelation operators, runBatch records the output schema of the streaming source (from StreamingExecutionRelation) and the DataFrame with the new data (in replacements temporary internal buffer).

runBatch makes sure that the output schema of the streaming source with a new data in the batch has not changed. If the output schema has changed, runBatch reports…​FIXME

triggerLogicalPlan Phase — Transforming Catalyst Expressions

runBatch transforms Catalyst expressions in withNewSources new logical plan (using replacements temporary internal buffer).

  • Catalyst Attribute is replaced with one if recorded in replacements internal buffer (that corresponds to the attribute in the DataFrame with the new input data in the batch)

  • CurrentTimestamp and CurrentDate Catalyst expressions are replaced with CurrentBatchTimestamp expression (with batchTimestampMs from OffsetSeqMetadata).

Note

CurrentTimestamp Catalyst expression corresponds to current_timestamp function.

Find more about current_timestamp function in Mastering Apache Spark 2 gitbook.

Note

CurrentDate Catalyst expression corresponds to current_date function.

Find more about current_date function in Mastering Apache Spark 2 gitbook.

queryPlanning Phase — Creating IncrementalExecution for Current Streaming Batch

StreamExecution runBatch queryPlanning.png
Figure 4. StreamExecution’s Query Planning (queryPlanning Phase)

In queryPlanning time-tracking section, runBatch creates a new IncrementalExecution with the following:

The new IncrementalExecution is recorded in lastExecution property.

Before leaving queryPlanning section, runBatch forces preparation of the physical plan for execution (i.e. requesting IncrementalExecution for executedPlan).

Note
executedPlan is a physical plan (i.e. SparkPlan) ready for execution with state optimization rules applied.

nextBatch Phase — Creating Dataset (with IncrementalExecution for New Data)

StreamExecution runBatch nextBatch.png
Figure 5. StreamExecution Creates DataFrame with New Data

runBatch creates a DataFrame with the new IncrementalExecution (as QueryExecution) and its analyzed output schema.

Note
The new DataFrame represents the result of a streaming query.

addBatch Phase — Adding Current Streaming Batch to Sink

StreamExecution runBatch addBatch.png
Figure 6. StreamExecution Creates DataFrame with New Data

In addBatch time-tracking section, runBatch requests the one and only streaming Sink to add the results of a streaming query (as the DataFrame created in nextBatch Phase).

Note
runBatch uses Sink.addBatch method to request the Sink to add the results.
Note
runBatch uses SQLExecution.withNewExecutionId to execute and track all the Spark actions (under one execution id) that Sink can use when requested to add the results.
Note
The new DataFrame will only be executed in Sink.addBatch.
Note
SQLExecution.withNewExecutionId posts a SparkListenerSQLExecutionStart event before executing Sink.addBatch and a SparkListenerSQLExecutionEnd event right afterwards.
Tip

Register SparkListener to get notified about the SQL execution events.

You can find more information on SparkListener in Mastering Apache Spark 2 gitbook.

awaitBatchLock Phase — Waking Up Threads Waiting For Stream to Progress

In awaitBatchLock code block (it is not a time-tracking section), runBatch acquires a lock on awaitProgressLock, wakes up all waiting threads on awaitProgressLockCondition and immediatelly releases awaitProgressLock lock.

Note
awaitProgressLockCondition is used mainly when StreamExecution processAllAvailable (and also when awaitOffset, but that seems mainly for testing).

isNewDataAvailable Internal Method

isNewDataAvailable: Boolean

isNewDataAvailable…​FIXME

Note
isNewDataAvailable is used when MicroBatchExecution is requested to runActivatedStream and constructNextBatch.

results matching ""

    No results matching ""