StreamingAggregationStateManager Contract — State Managers for Streaming Aggregation

StreamingAggregationStateManager is the abstraction of state managers that act as middlemen between state stores and the physical operators used in Streaming Aggregation (e.g. StateStoreSaveExec and StateStoreRestoreExec).

Table 1. StreamingAggregationStateManager Contract
Method Description

commit

commit(
  store: StateStore): Long

Commits all updates (changes) to the given state store and returns the new version

Used exclusively when StateStoreSaveExec physical operator is executed.

get

get(store: StateStore, key: UnsafeRow): UnsafeRow

Looks up the value of the key from the state store (the key is non-null)

Used exclusively when StateStoreRestoreExec physical operator is executed.

getKey

getKey(row: UnsafeRow): UnsafeRow

Extracts the columns for the key from the input row

Used when:

getStateValueSchema

getStateValueSchema: StructType

Gets the schema of the values in a state store

Used when StateStoreRestoreExec and StateStoreSaveExec physical operators are executed

iterator

iterator(
  store: StateStore): Iterator[UnsafeRowPair]

Returns all UnsafeRow key-value pairs in the given state store

Used exclusively when StateStoreSaveExec physical operator is executed.

keys

keys(store: StateStore): Iterator[UnsafeRow]

Returns all the keys in the state store

Used exclusively when physical operators with WatermarkSupport are requested to removeKeysOlderThanWatermark (i.e. exclusively when StateStoreSaveExec physical operator is executed).

put

put(
  store: StateStore,
  row: UnsafeRow): Unit

Stores (puts) the given row in the given state store

Used exclusively when StateStoreSaveExec physical operator is executed.

remove

remove(
  store: StateStore,
  key: UnsafeRow): Unit

Removes the key-value pair from the given state store per key

Used exclusively when StateStoreSaveExec physical operator is executed (directly or indirectly as a WatermarkSupport)

values

values(
  store: StateStore): Iterator[UnsafeRow]

All values in the state store

Used exclusively when StateStoreSaveExec physical operator is executed.

StreamingAggregationStateManager supports two versions of state managers for streaming aggregations (per the spark.sql.streaming.aggregation.stateFormatVersion internal configuration property):

Note
StreamingAggregationStateManagerBaseImpl is the one and only known direct implementation of the StreamingAggregationStateManager Contract in Spark Structured Streaming.
Note
StreamingAggregationStateManager is a Scala sealed trait which means that all the implementations are in the same compilation unit (a single file).

Creating StreamingAggregationStateManager Instance — createStateManager Factory Method

createStateManager(
  keyExpressions: Seq[Attribute],
  inputRowAttributes: Seq[Attribute],
  stateFormatVersion: Int): StreamingAggregationStateManager

createStateManager creates a new StreamingAggregationStateManager for a given stateFormatVersion:

createStateManager throws a IllegalArgumentException for any other stateFormatVersion:

Version [stateFormatVersion] is invalid
Note
createStateManager is used when StateStoreRestoreExec and StateStoreSaveExec physical operators are created.

results matching ""

    No results matching ""