processNewData(dataIter: Iterator[InternalRow]): Iterator[InternalRow]
InputProcessor Helper Class of FlatMapGroupsWithStateExec Physical Operator
InputProcessor is a helper class to manage state in the state store for every partition of a FlatMapGroupsWithStateExec physical operator.
InputProcessor is created exclusively when FlatMapGroupsWithStateExec physical operator is requested to execute and generate a recipe for a distributed computation (as an RDD[InternalRow]) (and uses InputProcessor in the storeUpdateFunction while processing rows per partition with a corresponding per-partition state store).
InputProcessor takes a single StateStore to be created. The StateStore manages the per-group state (and is used when processing new data and timed-out state data, and in the "all rows processed" callback).
Processing New Data (Creating Iterator of New Data Processed) — processNewData Method
processNewData creates a grouped iterator of (of pairs of) per-group state keys and the row values from the given data iterator (dataIter) with the grouping attributes and the output schema of the child operator (of the parent FlatMapGroupsWithStateExec physical operator).
For every per-group state key (in the grouped iterator), processNewData requests the StateManager (of the parent FlatMapGroupsWithStateExec physical operator) to get the state (from the StateStore) and callFunctionAndUpdateState (with the hasTimedOut flag off, i.e. false).
|
Note
|
processNewData is used exclusively when FlatMapGroupsWithStateExec physical operator is requested to execute and generate a recipe for a distributed computation (as an RDD[InternalRow]).
|
Processing Timed-Out State Data (Creating Iterator of Timed-Out State Data) — processTimedOutState Method
processTimedOutState(): Iterator[InternalRow]
processTimedOutState does nothing and simply returns an empty iterator for GroupStateTimeout.NoTimeout.
With timeout enabled, processTimedOutState gets the current timeout threshold per GroupStateTimeout:
processTimedOutState creates an iterator of timed-out state data by requesting the StateManager for all the available state data (in the StateStore) and takes only the state data with timeout defined and below the current timeout threshold.
In the end, for every timed-out state data, processTimedOutState callFunctionAndUpdateState (with the hasTimedOut flag enabled).
|
Note
|
processTimedOutState is used exclusively when FlatMapGroupsWithStateExec physical operator is requested to execute and generate a recipe for a distributed computation (as an RDD[InternalRow]).
|
callFunctionAndUpdateState Internal Method
callFunctionAndUpdateState(
stateData: StateData,
valueRowIter: Iterator[InternalRow],
hasTimedOut: Boolean): Iterator[InternalRow]
|
Note
|
When processing new data, When processing timed-out state data, |
callFunctionAndUpdateState creates a key object by requesting the given StateData for the UnsafeRow of the key (keyRow) and converts it to an object (using the internal state key converter).
callFunctionAndUpdateState creates value objects by taking every value row (from the given valueRowIter iterator) and converts them to objects (using the internal state value converter).
callFunctionAndUpdateState creates a new GroupStateImpl with the following:
-
The current state value (of the given
StateData) that could possibly benull -
The batchTimestampMs of the parent
FlatMapGroupsWithStateExecoperator (that could possibly be -1) -
The event-time watermark of the parent
FlatMapGroupsWithStateExecoperator (that could possibly be -1) -
The GroupStateTimeout of the parent
FlatMapGroupsWithStateExecoperator -
The watermarkPresent flag of the parent
FlatMapGroupsWithStateExecoperator -
The given
hasTimedOutflag
callFunctionAndUpdateState then executes the user-defined state function (of the parent FlatMapGroupsWithStateExec operator) on the key object, value objects, and the newly-created GroupStateImpl.
For every output value from the user-defined state function, callFunctionAndUpdateState updates numOutputRows performance metric and wraps the values to an internal row (using the internal output value converter).
In the end, callFunctionAndUpdateState returns a Iterator[InternalRow] which calls the completion function right after rows have been processed (so the iterator is considered fully consumed).
"All Rows Processed" Callback — onIteratorCompletion Internal Method
onIteratorCompletion: Unit
onIteratorCompletion branches off per whether the GroupStateImpl has been marked removed and no timeout timestamp is specified or not.
When the GroupStateImpl has been marked removed and no timeout timestamp is specified, onIteratorCompletion does the following:
-
Requests the StateManager (of the parent
FlatMapGroupsWithStateExecoperator) to remove the state (from the StateStore for the key row of the givenStateData) -
Increments the numUpdatedStateRows performance metric
Otherwise, when the GroupStateImpl has not been marked removed or the timeout timestamp is specified, onIteratorCompletion checks whether the timeout timestamp has changed by comparing the timeout timestamps of the GroupStateImpl and the given StateData.
(only when the GroupStateImpl has been updated, removed or the timeout timestamp changed) onIteratorCompletion does the following:
-
Requests the StateManager (of the parent
FlatMapGroupsWithStateExecoperator) to persist the state (in the StateStore with the key row, updated state object, and the timeout timestamp of the givenStateData) -
Increments the numUpdatedStateRows performance metrics
|
Note
|
onIteratorCompletion is used exclusively when InputProcessor is requested to callFunctionAndUpdateState (right after rows have been processed)
|
Internal Properties
| Name | Description |
|---|---|
|
A state key converter (of type
Used exclusively when |
|
A output value converter (of type
Used exclusively when |
|
A state value converter (of type
Used exclusively when |
|