StreamingAggregationStateManager Contract — State Managers for Streaming Aggregation

StreamingAggregationStateManager is the contract of state managers that are used in streaming aggregations (with StateStoreSaveExec and StateStoreRestoreExec physical operators).

Table 1. StreamingAggregationStateManager Contract
Method Description

commit

commit(store: StateStore): Long

Used exclusively when StateStoreSaveExec physical operator is executed.

get

get(store: StateStore, key: UnsafeRow): UnsafeRow

Gets the saved state for a non-null key from a state store

Used exclusively when StateStoreRestoreExec physical operator is executed.

getKey

getKey(row: UnsafeRow): UnsafeRow

Used when:

getStateValueSchema

getStateValueSchema: StructType

The schema for the state values

Used when StateStoreRestoreExec and StateStoreSaveExec physical operators are executed.

iterator

iterator(store: StateStore): Iterator[UnsafeRowPair]

Returns all the UnsafeRow key-value pairs in a state store

Used exclusively when StateStoreSaveExec physical operator is executed.

keys

keys(store: StateStore): Iterator[UnsafeRow]

Returns all keys in a state store (as an iterator)

Used exclusively when physical operators with WatermarkSupport are requested to removeKeysOlderThanWatermark (i.e. exclusively when StateStoreSaveExec physical operator is executed).

put

put(store: StateStore, row: UnsafeRow): Unit

Stores a row in a state store

Used exclusively when StateStoreSaveExec physical operator is executed.

remove

remove(store: StateStore, key: UnsafeRow): Unit

Used exclusively when StateStoreSaveExec physical operator is executed (directly or indirectly as a WatermarkSupport)

values

values(store: StateStore): Iterator[UnsafeRow]

Returns all the values in a state store

Used exclusively when StateStoreSaveExec physical operator is executed.

StreamingAggregationStateManager supports two versions of state managers for streaming aggregations:

  • 1 (legacy)

  • 2 (default)

Note
The version of a state manager is controlled using spark.sql.streaming.aggregation.stateFormatVersion internal configuration property.
Note
StreamingAggregationStateManagerBaseImpl is the one and only known base implementation of the StreamingAggregationStateManager Contract in Spark Structured Streaming.
Note
StreamingAggregationStateManager is a Scala sealed trait which means that all the implementations are in the same compilation unit (a single file).

Creating StreamingAggregationStateManager Instance — createStateManager Factory Method

createStateManager(
  keyExpressions: Seq[Attribute],
  inputRowAttributes: Seq[Attribute],
  stateFormatVersion: Int): StreamingAggregationStateManager

createStateManager creates a new StreamingAggregationStateManager for a given stateFormatVersion:

createStateManager throws a IllegalArgumentException for any other stateFormatVersion:

Version [stateFormatVersion] is invalid
Note
createStateManager is used when StateStoreRestoreExec and StateStoreSaveExec physical operators are created.

results matching ""

    No results matching ""