StateStoreSaveExec Unary Physical Operator — Saving State of Streaming Aggregates

StateStoreSaveExec is a unary physical operator that saves a streaming state to a state store with support for streaming watermark.

Note
A unary physical operator is a physical operator with a single child physical operator.

StateStoreSaveExec is created exclusively when StatefulAggregationStrategy execution planning strategy is executed (and plans Aggregate logical operators in a streaming structured query).

StateStoreSaveExec StatefulAggregationStrategy.png
Figure 1. StateStoreSaveExec and StatefulAggregationStrategy
Note

Aggregate logical operator is the result of:

  • RelationalGroupedDataset aggregations, i.e. agg and pivot operators

  • KeyValueGroupedDataset aggregations, i.e. mapGroups, flatMapGroups, mapGroupsWithState, flatMapGroupsWithState, reduceGroups, and agg, cogroup operators

  • SQL’s GROUP BY clause (possibly with WITH CUBE or WITH ROLLUP)

The optional properties, i.e. StatefulOperatorStateInfo, output mode, and event time watermark, are undefined when StateStoreSaveExec is created. StateStoreSaveExec is updated to hold their streaming batch-specific execution properties when IncrementalExecution prepares a streaming physical plan for execution (and state preparation rule is executed when StreamExecution plans a streaming query for a streaming batch).

StateStoreSaveExec IncrementalExecution.png
Figure 2. StateStoreSaveExec and IncrementalExecution
Note
Unlike StateStoreRestoreExec operator, StateStoreSaveExec takes output mode and event time watermark when created.

When executed, StateStoreSaveExec creates a StateStoreRDD to map over partitions with storeUpdateFunction that manages the StateStore.

StateStoreSaveExec StateStoreRDD.png
Figure 3. StateStoreSaveExec creates StateStoreRDD
// START: Only for easier debugging
// The state is then only for one partition
// which should make monitoring easier
import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS
spark.sessionState.conf.setConf(SHUFFLE_PARTITIONS, 1)

assert(spark.sessionState.conf.numShufflePartitions == 1)
// END: Only for easier debugging

val counts = spark
  .readStream
  .format("rate")
  .load
  .groupBy(window($"timestamp", "5 seconds") as "group")
  .agg(count("value") as "value_count") // <-- creates an Aggregate logical operator
  .orderBy("group")  // <-- makes for easier checking
scala> counts.explain
== Physical Plan ==
*(5) Sort [group#5 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(group#5 ASC NULLS FIRST, 1)
   +- *(4) HashAggregate(keys=[window#11], functions=[count(value#1L)])
      +- StateStoreSave [window#11], state info [ checkpoint = <unknown>, runId = ba5e4345-6d7a-4aca-b480-231ae9268916, opId = 0, ver = 0, numPartitions = 1], Append, 0, 2
         +- *(3) HashAggregate(keys=[window#11], functions=[merge_count(value#1L)])
            +- StateStoreRestore [window#11], state info [ checkpoint = <unknown>, runId = ba5e4345-6d7a-4aca-b480-231ae9268916, opId = 0, ver = 0, numPartitions = 1], 2
               +- *(2) HashAggregate(keys=[window#11], functions=[merge_count(value#1L)])
                  +- Exchange hashpartitioning(window#11, 1)
                     +- *(1) HashAggregate(keys=[window#11], functions=[partial_count(value#1L)])
                        +- *(1) Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 5000000), LongType, TimestampType)) AS window#11, value#1L]
                           +- *(1) Filter isnotnull(timestamp#0)
                              +- StreamingRelation rate, [timestamp#0, value#1L]

// Start the query and hence execute StateStoreSaveExec
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val t = Trigger.ProcessingTime(1.hour) // should be enough time for exploration
val sq = counts
  .writeStream
  .format("console")
  .option("truncate", false)
  .trigger(t)
  .outputMode(OutputMode.Complete)
  .start

// wait till the first batch which should happen right after start

import org.apache.spark.sql.execution.streaming._
val lastExecution = sq.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution
scala> println(lastExecution.logical.numberedTreeString)
00 WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@4d8749e7
01 +- Sort [group#5 ASC NULLS FIRST], true
02    +- Aggregate [window#11], [window#11 AS group#5, count(value#1L) AS value_count#10L]
03       +- Filter isnotnull(timestamp#0)
04          +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 5000000) + 0) + 5000000), LongType, TimestampType)) AS window#11, timestamp#0, value#1L]
05             +- Project [timestamp#211 AS timestamp#0, value#212L AS value#1L]
06                +- Streaming RelationV2 rate[timestamp#211, value#212L]
StateStoreSaveExec StateStoreRDD count.png
Figure 4. StateStoreSaveExec and StateStoreRDD (after streamingBatch.toRdd.count)
Note

The number of partitions of StateStoreRDD (and hence the number of Spark tasks) is what was defined for the child physical plan.

There will be that many StateStores as there are partitions in StateStoreRDD.

Note
StateStoreSaveExec behaves differently per output mode.

StateStoreSaveExec uses the performance metrics of StateStoreWriter.

Table 1. StateStoreSaveExec’s Performance Metrics
Key Usage

allUpdatesTimeMs

allRemovalsTimeMs

commitTimeMs

numOutputRows

numTotalStateRows

Number of the state keys in the state store

Corresponds to numRowsTotal in stateOperators in StreamingQueryProgress (and is available as sq.lastProgress.stateOperators(0).numRowsTotal for 0th operator).

numUpdatedStateRows

Number of the state keys that were stored as updates in the state store in a trigger and for the keys in the result rows of the upstream physical operator.

  • In Complete output mode, numUpdatedStateRows is the number of input rows (which should be exactly the number of output rows from the upstream operator)

Caution
FIXME
  • In Append output mode, numUpdatedStateRows is the number of input rows with keys that have not expired yet (per required watermark)

  • In Update output mode, numUpdatedStateRows is exactly number of output rows, i.e. the number of keys that have not expired yet if watermark has been defined at all (which is optional).

Caution
FIXME
Note
You can see the current value as numRowsUpdated attribute in stateOperators in StreamingQueryProgress (that is available as StreamingQuery.lastProgress.stateOperators(n).numRowsUpdated for nth operator).

stateMemory

Memory used by the StateStore

StateStoreSaveExec webui query details.png
Figure 5. StateStoreSaveExec in web UI (Details for Query)

When executed, StateStoreSaveExec executes the child physical operator and creates a StateStoreRDD (with storeUpdateFunction specific to the output mode).

The output schema of StateStoreSaveExec is exactly the child's output schema.

The output partitioning of StateStoreSaveExec is exactly the child's output partitioning.

StateStoreRestoreExec uses a StreamingAggregationStateManager (that is created for the keyExpressions, the output of the child physical operator and the stateFormatVersion).

Tip

Enable INFO logging level for org.apache.spark.sql.execution.streaming.StateStoreSaveExec to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.StateStoreSaveExec=INFO

Refer to Logging.

Executing StateStoreSaveExec — doExecute Method

doExecute(): RDD[InternalRow]
Note
doExecute is a part of SparkPlan contract to produce the result of a physical operator as an RDD of internal binary rows (i.e. InternalRow).

Internally, doExecute initializes metrics.

Note
doExecute requires that the optional outputMode is at this point defined (that should have happened when IncrementalExecution had prepared a streaming aggregation for execution).

doExecute executes child physical operator and creates a StateStoreRDD with storeUpdateFunction that:

  1. Generates an unsafe projection to access the key field (using keyExpressions and the output schema of child).

  2. Branches off per output mode.

Table 2. doExecute’s Behaviour per Output Mode
Output Mode doExecute’s Behaviour

Append

Note
Append is the default output mode when unspecified.
Note
Append output mode requires that a streaming query defines event time watermark (using withWatermark operator) on the event time column that is used in aggregation (directly or using window function).
  1. Finds late (aggregate) rows from child physical operator (that have expired per watermark)

  2. Stores the late rows in the state store (and increments numUpdatedStateRows metric)

  3. Gets all the added (late) rows from the state store

  4. Creates an iterator that removes the late rows from the state store when requested the next row and in the end commits the state updates

Note
numUpdatedStateRows metric is the number of rows that…​FIXME
Tip
Refer to Demo: StateStoreSaveExec with Append Output Mode for an example of StateStoreSaveExec in Append output mode.
Caution
FIXME When is "Filtering state store on:" printed out?
Caution
FIXME Track numUpdatedStateRows metric

  1. Uses watermarkPredicateForData predicate to exclude matching rows and (like in Complete output mode) stores all the remaining rows in StateStore.

  2. (like in Complete output mode) While storing the rows, increments numUpdatedStateRows metric (for every row) and records the total time in allUpdatesTimeMs metric.

  3. Takes all the rows from StateStore and returns a NextIterator that:

Complete

  1. Takes all UnsafeRow rows (from the parent iterator)

  2. Stores the rows by key in the state store eagerly (i.e. all rows that are available in the parent iterator before proceeding)

  3. Commits the state updates

  4. In the end, doExecute reads the key-row pairs from the state store and passes the rows along (i.e. to the following physical operator)

The number of keys stored in the state store is recorded in numUpdatedStateRows metric.

Note
In Complete output mode numOutputRows metric is exactly numTotalStateRows metric.
Tip
Refer to Demo: StateStoreSaveExec with Complete Output Mode for an example of StateStoreSaveExec in Complete output mode.

  1. Stores all rows (as UnsafeRow) in StateStore.

  2. While storing the rows, increments numUpdatedStateRows metric (for every row) and records the total time in allUpdatesTimeMs metric.

  3. Records 0 in allRemovalsTimeMs metric.

  4. Commits the state updates to StateStore and records the time in commitTimeMs metric.

  5. Records StateStore metrics.

  6. In the end, takes all the rows stored in StateStore and increments numOutputRows metric.

Update

Returns an iterator that filters out late aggregate rows (per watermark if defined) and stores the "young" rows in the state store (one by one, i.e. every next). With no more rows available, that removes the late rows from the state store (all at once) and commits the state updates.

Tip
Refer to Demo: StateStoreSaveExec with Update Output Mode for an example of StateStoreSaveExec in Update output mode.

Returns Iterator of rows that uses watermarkPredicateForData predicate to filter out late rows.

In hasNext, when rows are no longer available:

  1. Records the total time to iterate over all the rows in allUpdatesTimeMs metric.

  2. removeKeysOlderThanWatermark and records the time in allRemovalsTimeMs metric.

  3. Commits the updates to StateStore and records the time in commitTimeMs metric.

  4. Records StateStore metrics.

In next, stores a row in StateStore and increments numOutputRows and numUpdatedStateRows metrics.

doExecute reports a UnsupportedOperationException when executed with an invalid output mode.

Invalid output mode: [outputMode]

Creating StateStoreSaveExec Instance

StateStoreSaveExec takes the following when created:

results matching ""

    No results matching ""