Current batch timestamp = [timestamp]
IncrementalExecution — QueryExecution of Streaming Queries
IncrementalExecution is the QueryExecution of streaming queries.
|
Tip
|
Read up on QueryExecution in The Internals of Spark SQL book. |
IncrementalExecution is created (and becomes the StreamExecution.lastExecution) when:
-
MicroBatchExecutionis requested to run a single streaming micro-batch (in queryPlanning phase) -
ContinuousExecutionis requested to run a streaming query in continuous mode (in queryPlanning phase) -
Dataset.explain operator is executed (on a streaming query)
IncrementalExecution uses the statefulOperatorId internal counter for the IDs of the stateful operators in the optimized logical plan (while applying the preparations rules) when requested to prepare the plan for execution (in executedPlan phase).
Preparing Logical Plan (of Streaming Query) for Execution — optimizedPlan and executedPlan Phases of Query Execution
When requested for the optimized logical plan (of the logical plan), IncrementalExecution transforms CurrentBatchTimestamp and ExpressionWithRandomSeed expressions with the timestamp literal and new random seeds, respectively. When transforming CurrentBatchTimestamp expressions, IncrementalExecution prints out the following INFO message to the logs:
Once created, IncrementalExecution is immediately executed (by the MicroBatchExecution and ContinuousExecution stream execution engines in the queryPlanning phase) and so the entire query execution pipeline is executed up to and including executedPlan. That means that the extra planning strategies and the state preparation rule have been applied at this point and the streaming query is ready for execution.
Creating IncrementalExecution Instance
IncrementalExecution takes the following to be created:
-
OutputMode (as specified using DataStreamWriter.outputMode method)
State Checkpoint Location (Directory)
When created, IncrementalExecution is given the checkpoint location.
For the two available execution engines (MicroBatchExecution and ContinuousExecution), the checkpoint location is actually state directory under the checkpoint root directory.
val queryName = "rate2memory"
val checkpointLocation = s"file:/tmp/checkpoint-$queryName"
val query = spark
.readStream
.format("rate")
.load
.writeStream
.format("memory")
.queryName(queryName)
.option("checkpointLocation", checkpointLocation)
.start
// Give the streaming query a moment (one micro-batch)
// So lastExecution is available for the checkpointLocation
import scala.concurrent.duration._
query.awaitTermination(1.second.toMillis)
import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
val stateCheckpointDir = query
.asInstanceOf[StreamingQueryWrapper]
.streamingQuery
.lastExecution
.checkpointLocation
val stateDir = s"$checkpointLocation/state"
assert(stateCheckpointDir equals stateDir)
State checkpoint location is used exclusively when IncrementalExecution is requested for the state info of the next stateful operator (when requested to optimize a streaming physical plan using the state preparation rule that creates the stateful physical operators: StateStoreSaveExec, StateStoreRestoreExec, StreamingDeduplicateExec, FlatMapGroupsWithStateExec, StreamingSymmetricHashJoinExec, and StreamingGlobalLimitExec).
Number of State Stores (spark.sql.shuffle.partitions) — numStateStores Internal Property
numStateStores: Int
numStateStores is the number of state stores which corresponds to spark.sql.shuffle.partitions configuration property (default: 200).
|
Tip
|
Read up on spark.sql.shuffle.partitions configuration property (and the others) in The Internals of Spark SQL book. |
Internally, numStateStores requests the OffsetSeqMetadata for the spark.sql.shuffle.partitions configuration property (using the streaming configuration) or simply takes whatever was defined for the given SparkSession (default: 200).
numStateStores is initialized right when IncrementalExecution is created.
numStateStores is used exclusively when IncrementalExecution is requested for the state info of the next stateful operator (when requested to optimize a streaming physical plan using the state preparation rule that creates the stateful physical operators: StateStoreSaveExec, StateStoreRestoreExec, StreamingDeduplicateExec, FlatMapGroupsWithStateExec, StreamingSymmetricHashJoinExec, and StreamingGlobalLimitExec).
Extra Planning Strategies for Streaming Queries — planner Property
IncrementalExecution uses a custom SparkPlanner with the following extra planning strategies to plan the streaming query for execution:
|
Tip
|
Read up on SparkPlanner in The Internals of Spark SQL book. |
State Preparation Rule For Execution-Specific Configuration — state Property
state: Rule[SparkPlan]
state is a custom physical preparation rule (Rule[SparkPlan]) that can transform a streaming physical plan (SparkPlan) with the following physical operators:
-
StateStoreSaveExec with any unary physical operator (
UnaryExecNode) with a StateStoreRestoreExec
state simply transforms the physical plan with the above physical operators and fills out the execution-specific configuration:
-
nextStatefulOperationStateInfo for the state info
-
batchWatermarkMs (through the OffsetSeqMetadata) for the event-time watermark
-
batchTimestampMs (through the OffsetSeqMetadata) for the current timestamp
-
getStateWatermarkPredicates for the state watermark predicates (for StreamingSymmetricHashJoinExec)
state rule is used (as part of the physical query optimizations) when IncrementalExecution is requested to optimize (prepare) the physical plan of the streaming query (once for ContinuousExecution and every trigger for MicroBatchExecution in their queryPlanning phases).
|
Tip
|
Read up on Physical Query Optimizations in The Internals of Spark SQL book. |
nextStatefulOperationStateInfo Internal Method
nextStatefulOperationStateInfo(): StatefulOperatorStateInfo
nextStatefulOperationStateInfo simply creates a new StatefulOperatorStateInfo with the state checkpoint location, the run ID (of the streaming query), the next statefulOperator ID, the current batch ID, and the number of state stores.
|
Note
|
The only changing part of All the other properties (the state checkpoint location, the run ID, the current batch ID, and the number of state stores) are the same within a single The only two properties that may ever change are the run ID (after a streaming query is restarted from the checkpoint) and the current batch ID (every micro-batch in MicroBatchExecution execution engine). |
|
Note
|
nextStatefulOperationStateInfo is used exclusively when IncrementalExecution is requested to optimize a streaming physical plan using the state preparation rule (and creates the stateful physical operators: StateStoreSaveExec, StateStoreRestoreExec, StreamingDeduplicateExec, FlatMapGroupsWithStateExec, StreamingSymmetricHashJoinExec, and StreamingGlobalLimitExec).
|
Checking Out Whether Last Execution Requires Another Non-Data Micro-Batch — shouldRunAnotherBatch Method
shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean
shouldRunAnotherBatch is positive (true) if there is at least one StateStoreWriter operator (in the executedPlan physical query plan) that requires another non-data batch (per the given OffsetSeqMetadata with the event-time watermark and the batch timestamp).
Otherwise, shouldRunAnotherBatch is negative (false).
|
Note
|
shouldRunAnotherBatch is used exclusively when MicroBatchExecution is requested to construct the next streaming micro-batch (and checks out whether the last batch execution requires another non-data batch).
|
Demo: State Checkpoint Directory
// START: Only for easier debugging
// The state is then only for one partition
// which should make monitoring easier
import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS
spark.sessionState.conf.setConf(SHUFFLE_PARTITIONS, 1)
assert(spark.sessionState.conf.numShufflePartitions == 1)
// END: Only for easier debugging
val counts = spark
.readStream
.format("rate")
.load
.groupBy(window($"timestamp", "5 seconds") as "group")
.agg(count("value") as "value_count") // <-- creates an Aggregate logical operator
.orderBy("group") // <-- makes for easier checking
assert(counts.isStreaming, "This should be a streaming query")
// Search for "checkpoint = <unknown>" in the following output
// Looks for StateStoreSave and StateStoreRestore
scala> counts.explain
== Physical Plan ==
*(5) Sort [group#5 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(group#5 ASC NULLS FIRST, 1)
+- *(4) HashAggregate(keys=[window#11], functions=[count(value#1L)])
+- StateStoreSave [window#11], state info [ checkpoint = <unknown>, runId = 558bf725-accb-487d-97eb-f790fa4a6138, opId = 0, ver = 0, numPartitions = 1], Append, 0, 2
+- *(3) HashAggregate(keys=[window#11], functions=[merge_count(value#1L)])
+- StateStoreRestore [window#11], state info [ checkpoint = <unknown>, runId = 558bf725-accb-487d-97eb-f790fa4a6138, opId = 0, ver = 0, numPartitions = 1], 2
+- *(2) HashAggregate(keys=[window#11], functions=[merge_count(value#1L)])
+- Exchange hashpartitioning(window#11, 1)
+- *(1) HashAggregate(keys=[window#11], functions=[partial_count(value#1L)])
+- *(1) Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 5000000), LongType, TimestampType)) AS window#11, value#1L]
+- *(1) Filter isnotnull(timestamp#0)
+- StreamingRelation rate, [timestamp#0, value#1L]
// Start the query to access lastExecution that has the checkpoint resolved
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val t = Trigger.ProcessingTime(1.hour) // should be enough time for exploration
val sq = counts
.writeStream
.format("console")
.option("truncate", false)
.option("checkpointLocation", "/tmp/spark-streams-state-checkpoint-root")
.trigger(t)
.outputMode(OutputMode.Complete)
.start
// wait till the first batch which should happen right after start
import org.apache.spark.sql.execution.streaming._
val lastExecution = sq.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution
scala> println(lastExecution.checkpointLocation)
file:/tmp/spark-streams-state-checkpoint-root/state