Current batch timestamp = [timestamp]
IncrementalExecution — QueryExecution of Streaming Queries
IncrementalExecution
is the QueryExecution
of streaming queries.
Tip
|
Read up on QueryExecution in The Internals of Spark SQL book. |
IncrementalExecution
is created (and becomes the StreamExecution.lastExecution) when:
-
MicroBatchExecution
is requested to run a single streaming micro-batch (in queryPlanning phase) -
ContinuousExecution
is requested to run a streaming query in continuous mode (in queryPlanning phase) -
Dataset.explain operator is executed (on a streaming query)
IncrementalExecution
uses the statefulOperatorId
internal counter for the IDs of the stateful operators in the optimized logical plan (while applying the preparations rules) when requested to prepare the plan for execution (in executedPlan phase).
Preparing Logical Plan (of Streaming Query) for Execution — optimizedPlan
and executedPlan
Phases of Query Execution
When requested for the optimized logical plan (of the logical plan), IncrementalExecution
transforms CurrentBatchTimestamp
and ExpressionWithRandomSeed
expressions with the timestamp literal and new random seeds, respectively. When transforming CurrentBatchTimestamp
expressions, IncrementalExecution
prints out the following INFO message to the logs:
Once created, IncrementalExecution
is immediately executed (by the MicroBatchExecution and ContinuousExecution stream execution engines in the queryPlanning phase) and so the entire query execution pipeline is executed up to and including executedPlan. That means that the extra planning strategies and the state preparation rule have been applied at this point and the streaming query is ready for execution.
Creating IncrementalExecution Instance
IncrementalExecution
takes the following to be created:
-
OutputMode (as specified using DataStreamWriter.outputMode method)
State Checkpoint Location (Directory)
When created, IncrementalExecution
is given the checkpoint location.
For the two available execution engines (MicroBatchExecution and ContinuousExecution), the checkpoint location is actually state directory under the checkpoint root directory.
val queryName = "rate2memory"
val checkpointLocation = s"file:/tmp/checkpoint-$queryName"
val query = spark
.readStream
.format("rate")
.load
.writeStream
.format("memory")
.queryName(queryName)
.option("checkpointLocation", checkpointLocation)
.start
// Give the streaming query a moment (one micro-batch)
// So lastExecution is available for the checkpointLocation
import scala.concurrent.duration._
query.awaitTermination(1.second.toMillis)
import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
val stateCheckpointDir = query
.asInstanceOf[StreamingQueryWrapper]
.streamingQuery
.lastExecution
.checkpointLocation
val stateDir = s"$checkpointLocation/state"
assert(stateCheckpointDir equals stateDir)
State checkpoint location is used exclusively when IncrementalExecution
is requested for the state info of the next stateful operator (when requested to optimize a streaming physical plan using the state preparation rule that creates the stateful physical operators: StateStoreSaveExec, StateStoreRestoreExec, StreamingDeduplicateExec, FlatMapGroupsWithStateExec, StreamingSymmetricHashJoinExec, and StreamingGlobalLimitExec).
Number of State Stores (spark.sql.shuffle.partitions) — numStateStores
Internal Property
numStateStores: Int
numStateStores
is the number of state stores which corresponds to spark.sql.shuffle.partitions
configuration property (default: 200
).
Tip
|
Read up on spark.sql.shuffle.partitions configuration property (and the others) in The Internals of Spark SQL book. |
Internally, numStateStores
requests the OffsetSeqMetadata for the spark.sql.shuffle.partitions configuration property (using the streaming configuration) or simply takes whatever was defined for the given SparkSession (default: 200
).
numStateStores
is initialized right when IncrementalExecution
is created.
numStateStores
is used exclusively when IncrementalExecution
is requested for the state info of the next stateful operator (when requested to optimize a streaming physical plan using the state preparation rule that creates the stateful physical operators: StateStoreSaveExec, StateStoreRestoreExec, StreamingDeduplicateExec, FlatMapGroupsWithStateExec, StreamingSymmetricHashJoinExec, and StreamingGlobalLimitExec).
Extra Planning Strategies for Streaming Queries — planner
Property
IncrementalExecution
uses a custom SparkPlanner
with the following extra planning strategies to plan the streaming query for execution:
Tip
|
Read up on SparkPlanner in The Internals of Spark SQL book. |
State Preparation Rule For Execution-Specific Configuration — state
Property
state: Rule[SparkPlan]
state
is a custom physical preparation rule (Rule[SparkPlan]
) that can transform a streaming physical plan (SparkPlan
) with the following physical operators:
-
StateStoreSaveExec with any unary physical operator (
UnaryExecNode
) with a StateStoreRestoreExec
state
simply transforms the physical plan with the above physical operators and fills out the execution-specific configuration:
-
nextStatefulOperationStateInfo for the state info
-
batchWatermarkMs (through the OffsetSeqMetadata) for the event-time watermark
-
batchTimestampMs (through the OffsetSeqMetadata) for the current timestamp
-
getStateWatermarkPredicates for the state watermark predicates (for StreamingSymmetricHashJoinExec)
state
rule is used (as part of the physical query optimizations) when IncrementalExecution
is requested to optimize (prepare) the physical plan of the streaming query (once for ContinuousExecution and every trigger for MicroBatchExecution in their queryPlanning phases).
Tip
|
Read up on Physical Query Optimizations in The Internals of Spark SQL book. |
nextStatefulOperationStateInfo
Internal Method
nextStatefulOperationStateInfo(): StatefulOperatorStateInfo
nextStatefulOperationStateInfo
simply creates a new StatefulOperatorStateInfo with the state checkpoint location, the run ID (of the streaming query), the next statefulOperator ID, the current batch ID, and the number of state stores.
Note
|
The only changing part of All the other properties (the state checkpoint location, the run ID, the current batch ID, and the number of state stores) are the same within a single The only two properties that may ever change are the run ID (after a streaming query is restarted from the checkpoint) and the current batch ID (every micro-batch in MicroBatchExecution execution engine). |
Note
|
nextStatefulOperationStateInfo is used exclusively when IncrementalExecution is requested to optimize a streaming physical plan using the state preparation rule (and creates the stateful physical operators: StateStoreSaveExec, StateStoreRestoreExec, StreamingDeduplicateExec, FlatMapGroupsWithStateExec, StreamingSymmetricHashJoinExec, and StreamingGlobalLimitExec).
|
Checking Out Whether Last Execution Requires Another Non-Data Micro-Batch — shouldRunAnotherBatch
Method
shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean
shouldRunAnotherBatch
is positive (true
) if there is at least one StateStoreWriter operator (in the executedPlan physical query plan) that requires another non-data batch (per the given OffsetSeqMetadata with the event-time watermark and the batch timestamp).
Otherwise, shouldRunAnotherBatch
is negative (false
).
Note
|
shouldRunAnotherBatch is used exclusively when MicroBatchExecution is requested to construct the next streaming micro-batch (and checks out whether the last batch execution requires another non-data batch).
|
Demo: State Checkpoint Directory
// START: Only for easier debugging
// The state is then only for one partition
// which should make monitoring easier
import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS
spark.sessionState.conf.setConf(SHUFFLE_PARTITIONS, 1)
assert(spark.sessionState.conf.numShufflePartitions == 1)
// END: Only for easier debugging
val counts = spark
.readStream
.format("rate")
.load
.groupBy(window($"timestamp", "5 seconds") as "group")
.agg(count("value") as "value_count") // <-- creates an Aggregate logical operator
.orderBy("group") // <-- makes for easier checking
assert(counts.isStreaming, "This should be a streaming query")
// Search for "checkpoint = <unknown>" in the following output
// Looks for StateStoreSave and StateStoreRestore
scala> counts.explain
== Physical Plan ==
*(5) Sort [group#5 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(group#5 ASC NULLS FIRST, 1)
+- *(4) HashAggregate(keys=[window#11], functions=[count(value#1L)])
+- StateStoreSave [window#11], state info [ checkpoint = <unknown>, runId = 558bf725-accb-487d-97eb-f790fa4a6138, opId = 0, ver = 0, numPartitions = 1], Append, 0, 2
+- *(3) HashAggregate(keys=[window#11], functions=[merge_count(value#1L)])
+- StateStoreRestore [window#11], state info [ checkpoint = <unknown>, runId = 558bf725-accb-487d-97eb-f790fa4a6138, opId = 0, ver = 0, numPartitions = 1], 2
+- *(2) HashAggregate(keys=[window#11], functions=[merge_count(value#1L)])
+- Exchange hashpartitioning(window#11, 1)
+- *(1) HashAggregate(keys=[window#11], functions=[partial_count(value#1L)])
+- *(1) Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 5000000), LongType, TimestampType)) AS window#11, value#1L]
+- *(1) Filter isnotnull(timestamp#0)
+- StreamingRelation rate, [timestamp#0, value#1L]
// Start the query to access lastExecution that has the checkpoint resolved
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val t = Trigger.ProcessingTime(1.hour) // should be enough time for exploration
val sq = counts
.writeStream
.format("console")
.option("truncate", false)
.option("checkpointLocation", "/tmp/spark-streams-state-checkpoint-root")
.trigger(t)
.outputMode(OutputMode.Complete)
.start
// wait till the first batch which should happen right after start
import org.apache.spark.sql.execution.streaming._
val lastExecution = sq.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution
scala> println(lastExecution.checkpointLocation)
file:/tmp/spark-streams-state-checkpoint-root/state