InputProcessor Helper Class of FlatMapGroupsWithStateExec Physical Operator

InputProcessor is a helper class to manage state in the state store for every partition of a FlatMapGroupsWithStateExec physical operator.

InputProcessor is created exclusively when FlatMapGroupsWithStateExec physical operator is requested to execute and generate a recipe for a distributed computation (as an RDD[InternalRow]) (and uses InputProcessor in the storeUpdateFunction while processing rows per partition with a corresponding per-partition state store).

InputProcessor takes a single StateStore to be created. The StateStore manages the per-group state (and is used when processing new data and timed-out state data, and in the "all rows processed" callback).

Processing New Data (Creating Iterator of New Data Processed) — processNewData Method

processNewData(dataIter: Iterator[InternalRow]): Iterator[InternalRow]

processNewData creates a grouped iterator of (of pairs of) per-group state keys and the row values from the given data iterator (dataIter) with the grouping attributes and the output schema of the child operator (of the parent FlatMapGroupsWithStateExec physical operator).

For every per-group state key (in the grouped iterator), processNewData requests the StateManager (of the parent FlatMapGroupsWithStateExec physical operator) to get the state (from the StateStore) and callFunctionAndUpdateState (with the hasTimedOut flag off, i.e. false).

Note
processNewData is used exclusively when FlatMapGroupsWithStateExec physical operator is requested to execute and generate a recipe for a distributed computation (as an RDD[InternalRow]).

Processing Timed-Out State Data (Creating Iterator of Timed-Out State Data) — processTimedOutState Method

processTimedOutState(): Iterator[InternalRow]

processTimedOutState does nothing and simply returns an empty iterator for GroupStateTimeout.NoTimeout.

With timeout enabled, processTimedOutState gets the current timeout threshold per GroupStateTimeout:

processTimedOutState creates an iterator of timed-out state data by requesting the StateManager for all the available state data (in the StateStore) and takes only the state data with timeout defined and below the current timeout threshold.

In the end, for every timed-out state data, processTimedOutState callFunctionAndUpdateState (with the hasTimedOut flag enabled).

Note
processTimedOutState is used exclusively when FlatMapGroupsWithStateExec physical operator is requested to execute and generate a recipe for a distributed computation (as an RDD[InternalRow]).

callFunctionAndUpdateState Internal Method

callFunctionAndUpdateState(
  stateData: StateData,
  valueRowIter: Iterator[InternalRow],
  hasTimedOut: Boolean): Iterator[InternalRow]
Note

callFunctionAndUpdateState is used when InputProcessor is requested to process new data and timed-out state data.

When processing new data, hasTimedOut flag is off (false).

When processing timed-out state data, hasTimedOut flag is on (true).

callFunctionAndUpdateState creates a key object by requesting the given StateData for the UnsafeRow of the key (keyRow) and converts it to an object (using the internal state key converter).

callFunctionAndUpdateState creates value objects by taking every value row (from the given valueRowIter iterator) and converts them to objects (using the internal state value converter).

callFunctionAndUpdateState creates a new GroupStateImpl with the following:

  • The current state value (of the given StateData) that could possibly be null

  • The batchTimestampMs of the parent FlatMapGroupsWithStateExec operator (that could possibly be -1)

  • The event-time watermark of the parent FlatMapGroupsWithStateExec operator (that could possibly be -1)

  • The GroupStateTimeout of the parent FlatMapGroupsWithStateExec operator

  • The watermarkPresent flag of the parent FlatMapGroupsWithStateExec operator

  • The given hasTimedOut flag

callFunctionAndUpdateState then executes the user-defined state function (of the parent FlatMapGroupsWithStateExec operator) on the key object, value objects, and the newly-created GroupStateImpl.

For every output value from the user-defined state function, callFunctionAndUpdateState updates numOutputRows performance metric and wraps the values to an internal row (using the internal output value converter).

In the end, callFunctionAndUpdateState returns a Iterator[InternalRow] which calls the completion function right after rows have been processed (so the iterator is considered fully consumed).

"All Rows Processed" Callback — onIteratorCompletion Internal Method

onIteratorCompletion: Unit

onIteratorCompletion branches off per whether the GroupStateImpl has been marked removed and no timeout timestamp is specified or not.

When the GroupStateImpl has been marked removed and no timeout timestamp is specified, onIteratorCompletion does the following:

  1. Requests the StateManager (of the parent FlatMapGroupsWithStateExec operator) to remove the state (from the StateStore for the key row of the given StateData)

  2. Increments the numUpdatedStateRows performance metric

Otherwise, when the GroupStateImpl has not been marked removed or the timeout timestamp is specified, onIteratorCompletion checks whether the timeout timestamp has changed by comparing the timeout timestamps of the GroupStateImpl and the given StateData.

(only when the GroupStateImpl has been updated, removed or the timeout timestamp changed) onIteratorCompletion does the following:

  1. Requests the StateManager (of the parent FlatMapGroupsWithStateExec operator) to persist the state (in the StateStore with the key row, updated state object, and the timeout timestamp of the given StateData)

  2. Increments the numUpdatedStateRows performance metrics

Note
onIteratorCompletion is used exclusively when InputProcessor is requested to callFunctionAndUpdateState (right after rows have been processed)

Internal Properties

Name Description

getKeyObj

A state key converter (of type InternalRow ⇒ Any) to deserialize a given row (for a per-group state key) to the current state value

  • The deserialization expression for keys is specified as the key deserializer expression when the parent FlatMapGroupsWithStateExec operator is created

  • The data type of state keys is specified as the grouping attributes when the parent FlatMapGroupsWithStateExec operator is created

Used exclusively when InputProcessor is requested to callFunctionAndUpdateState.

getOutputRow

A output value converter (of type Any ⇒ InternalRow) to wrap a given output value (from the user-defined state function) to a row

  • The data type of the row is specified as the data type of the output object attribute when the parent FlatMapGroupsWithStateExec operator is created

Used exclusively when InputProcessor is requested to callFunctionAndUpdateState.

getValueObj

A state value converter (of type InternalRow ⇒ Any) to deserialize a given row (for a per-group state value) to a Scala value

  • The deserialization expression for values is specified as the value deserializer expression when the parent FlatMapGroupsWithStateExec operator is created

  • The data type of state values is specified as the data attributes when the parent FlatMapGroupsWithStateExec operator is created

Used exclusively when InputProcessor is requested to callFunctionAndUpdateState.

numOutputRows

numOutputRows performance metric

results matching ""

    No results matching ""