IncrementalExecution — QueryExecution of Streaming Queries

IncrementalExecution is the QueryExecution of streaming queries.

Tip
Read up on QueryExecution in The Internals of Spark SQL book.

IncrementalExecution is created (and becomes the StreamExecution.lastExecution) when:

IncrementalExecution uses the statefulOperatorId internal counter for the IDs of the stateful operators in the optimized logical plan (while applying the preparations rules) when requested to prepare the plan for execution (in executedPlan phase).

Preparing Logical Plan (of Streaming Query) for Execution — optimizedPlan and executedPlan Phases of Query Execution

When requested for the optimized logical plan (of the logical plan), IncrementalExecution transforms CurrentBatchTimestamp and ExpressionWithRandomSeed expressions with the timestamp literal and new random seeds, respectively. When transforming CurrentBatchTimestamp expressions, IncrementalExecution prints out the following INFO message to the logs:

Current batch timestamp = [timestamp]

Once created, IncrementalExecution is immediately executed (by the MicroBatchExecution and ContinuousExecution stream execution engines in the queryPlanning phase) and so the entire query execution pipeline is executed up to and including executedPlan. That means that the extra planning strategies and the state preparation rule have been applied at this point and the streaming query is ready for execution.

Creating IncrementalExecution Instance

IncrementalExecution takes the following to be created:

State Checkpoint Location (Directory)

When created, IncrementalExecution is given the checkpoint location.

For the two available execution engines (MicroBatchExecution and ContinuousExecution), the checkpoint location is actually state directory under the checkpoint root directory.

val queryName = "rate2memory"
val checkpointLocation = s"file:/tmp/checkpoint-$queryName"
val query = spark
  .readStream
  .format("rate")
  .load
  .writeStream
  .format("memory")
  .queryName(queryName)
  .option("checkpointLocation", checkpointLocation)
  .start

// Give the streaming query a moment (one micro-batch)
// So lastExecution is available for the checkpointLocation
import scala.concurrent.duration._
query.awaitTermination(1.second.toMillis)

import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
val stateCheckpointDir = query
  .asInstanceOf[StreamingQueryWrapper]
  .streamingQuery
  .lastExecution
  .checkpointLocation
val stateDir = s"$checkpointLocation/state"
assert(stateCheckpointDir equals stateDir)

State checkpoint location is used exclusively when IncrementalExecution is requested for the state info of the next stateful operator (when requested to optimize a streaming physical plan using the state preparation rule that creates the stateful physical operators: StateStoreSaveExec, StateStoreRestoreExec, StreamingDeduplicateExec, FlatMapGroupsWithStateExec, StreamingSymmetricHashJoinExec, and StreamingGlobalLimitExec).

Number of State Stores (spark.sql.shuffle.partitions) — numStateStores Internal Property

numStateStores: Int

numStateStores is the number of state stores which corresponds to spark.sql.shuffle.partitions configuration property (default: 200).

Tip
Read up on spark.sql.shuffle.partitions configuration property (and the others) in The Internals of Spark SQL book.

Internally, numStateStores requests the OffsetSeqMetadata for the spark.sql.shuffle.partitions configuration property (using the streaming configuration) or simply takes whatever was defined for the given SparkSession (default: 200).

numStateStores is initialized right when IncrementalExecution is created.

numStateStores is used exclusively when IncrementalExecution is requested for the state info of the next stateful operator (when requested to optimize a streaming physical plan using the state preparation rule that creates the stateful physical operators: StateStoreSaveExec, StateStoreRestoreExec, StreamingDeduplicateExec, FlatMapGroupsWithStateExec, StreamingSymmetricHashJoinExec, and StreamingGlobalLimitExec).

Extra Planning Strategies for Streaming Queries — planner Property

IncrementalExecution uses a custom SparkPlanner with the following extra planning strategies to plan the streaming query for execution:

Tip
Read up on SparkPlanner in The Internals of Spark SQL book.

State Preparation Rule For Execution-Specific Configuration — state Property

state: Rule[SparkPlan]

state is a custom physical preparation rule (Rule[SparkPlan]) that can transform a streaming physical plan (SparkPlan) with the following physical operators:

state simply transforms the physical plan with the above physical operators and fills out the execution-specific configuration:

state rule is used (as part of the physical query optimizations) when IncrementalExecution is requested to optimize (prepare) the physical plan of the streaming query (once for ContinuousExecution and every trigger for MicroBatchExecution in their queryPlanning phases).

nextStatefulOperationStateInfo Internal Method

nextStatefulOperationStateInfo(): StatefulOperatorStateInfo

nextStatefulOperationStateInfo simply creates a new StatefulOperatorStateInfo with the state checkpoint location, the run ID (of the streaming query), the next statefulOperator ID, the current batch ID, and the number of state stores.

Note

The only changing part of StatefulOperatorStateInfo across calls of the nextStatefulOperationStateInfo method is the the next statefulOperator ID.

All the other properties (the state checkpoint location, the run ID, the current batch ID, and the number of state stores) are the same within a single IncrementalExecution instance.

The only two properties that may ever change are the run ID (after a streaming query is restarted from the checkpoint) and the current batch ID (every micro-batch in MicroBatchExecution execution engine).

Note
nextStatefulOperationStateInfo is used exclusively when IncrementalExecution is requested to optimize a streaming physical plan using the state preparation rule (and creates the stateful physical operators: StateStoreSaveExec, StateStoreRestoreExec, StreamingDeduplicateExec, FlatMapGroupsWithStateExec, StreamingSymmetricHashJoinExec, and StreamingGlobalLimitExec).

Checking Out Whether Last Execution Requires Another Non-Data Micro-Batch — shouldRunAnotherBatch Method

shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean

shouldRunAnotherBatch is positive (true) if there is at least one StateStoreWriter operator (in the executedPlan physical query plan) that requires another non-data batch (per the given OffsetSeqMetadata with the event-time watermark and the batch timestamp).

Otherwise, shouldRunAnotherBatch is negative (false).

Note
shouldRunAnotherBatch is used exclusively when MicroBatchExecution is requested to construct the next streaming micro-batch (and checks out whether the last batch execution requires another non-data batch).

Demo: State Checkpoint Directory

// START: Only for easier debugging
// The state is then only for one partition
// which should make monitoring easier
import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS
spark.sessionState.conf.setConf(SHUFFLE_PARTITIONS, 1)

assert(spark.sessionState.conf.numShufflePartitions == 1)
// END: Only for easier debugging

val counts = spark
  .readStream
  .format("rate")
  .load
  .groupBy(window($"timestamp", "5 seconds") as "group")
  .agg(count("value") as "value_count") // <-- creates an Aggregate logical operator
  .orderBy("group")  // <-- makes for easier checking

assert(counts.isStreaming, "This should be a streaming query")

// Search for "checkpoint = <unknown>" in the following output
// Looks for StateStoreSave and StateStoreRestore
scala> counts.explain
== Physical Plan ==
*(5) Sort [group#5 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(group#5 ASC NULLS FIRST, 1)
   +- *(4) HashAggregate(keys=[window#11], functions=[count(value#1L)])
      +- StateStoreSave [window#11], state info [ checkpoint = <unknown>, runId = 558bf725-accb-487d-97eb-f790fa4a6138, opId = 0, ver = 0, numPartitions = 1], Append, 0, 2
         +- *(3) HashAggregate(keys=[window#11], functions=[merge_count(value#1L)])
            +- StateStoreRestore [window#11], state info [ checkpoint = <unknown>, runId = 558bf725-accb-487d-97eb-f790fa4a6138, opId = 0, ver = 0, numPartitions = 1], 2
               +- *(2) HashAggregate(keys=[window#11], functions=[merge_count(value#1L)])
                  +- Exchange hashpartitioning(window#11, 1)
                     +- *(1) HashAggregate(keys=[window#11], functions=[partial_count(value#1L)])
                        +- *(1) Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 5000000), LongType, TimestampType)) AS window#11, value#1L]
                           +- *(1) Filter isnotnull(timestamp#0)
                              +- StreamingRelation rate, [timestamp#0, value#1L]

// Start the query to access lastExecution that has the checkpoint resolved
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val t = Trigger.ProcessingTime(1.hour) // should be enough time for exploration
val sq = counts
  .writeStream
  .format("console")
  .option("truncate", false)
  .option("checkpointLocation", "/tmp/spark-streams-state-checkpoint-root")
  .trigger(t)
  .outputMode(OutputMode.Complete)
  .start

// wait till the first batch which should happen right after start

import org.apache.spark.sql.execution.streaming._
val lastExecution = sq.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution
scala> println(lastExecution.checkpointLocation)
file:/tmp/spark-streams-state-checkpoint-root/state

results matching ""

    No results matching ""