StateStoreHandler Internal Contract

StateStoreHandler is the internal base of state store handlers that manage a StateStore (i.e. commit, abortIfNeeded and metrics).

StateStoreHandler takes a single StateStoreType to be created:

Note
StateStoreHandler is a Scala private abstract class and cannot be created directly. It is created indirectly for the concrete StateStoreHandlers.
Table 1. StateStoreHandler Contract
Method Description

stateStore

stateStore: StateStore
Table 2. StateStoreHandlers
StateStoreHandler Description

KeyToNumValuesStore

StateStoreHandler of KeyToNumValuesType

KeyWithIndexToValueStore

Tip

Enable ALL logging levels for org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager.StateStoreHandler logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager.StateStoreHandler=ALL

Refer to Logging.

Performance Metrics — metrics Method

metrics: StateStoreMetrics

metrics simply requests the StateStore for the StateStoreMetrics.

Note
metrics is used exclusively when SymmetricHashJoinStateManager is requested for the metrics.

Committing State (Changes to State Store) — commit Method

commit(): Unit

commit…​FIXME

Note
commit is used when…​FIXME

abortIfNeeded Method

abortIfNeeded(): Unit

abortIfNeeded…​FIXME

Note
abortIfNeeded is used when…​FIXME

Loading State Store (By Key and Value Schemas) — getStateStore Method

getStateStore(
  keySchema: StructType,
  valueSchema: StructType): StateStore

getStateStore creates a new StateStoreProviderId (for the StatefulOperatorStateInfo of the owning SymmetricHashJoinStateManager, the partition ID from the execution context, and the name of the state store for the JoinSide and StateStoreType).

getStateStore uses the StateStore utility to look up a StateStore for the StateStoreProviderId.

In the end, getStateStore prints out the following INFO message to the logs:

Loaded store [storeId]
Note
getStateStore is used when KeyToNumValuesStore and KeyWithIndexToValueStore state store handlers are created (for SymmetricHashJoinStateManager).

StateStoreType Contract (Sealed Trait)

StateStoreType is required to create a StateStoreHandler.

Table 3. StateStoreTypes
StateStoreType toString Description

KeyToNumValuesType

keyToNumValues

KeyWithIndexToValueType

keyWithIndexToValue

Note
StateStoreType is a Scala private sealed trait which means that all the implementations are in the same compilation unit (a single file).

results matching ""

    No results matching ""