processNewData(dataIter: Iterator[InternalRow]): Iterator[InternalRow]
InputProcessor Helper Class of FlatMapGroupsWithStateExec Physical Operator
InputProcessor
is a helper class to manage state in the state store for every partition of a FlatMapGroupsWithStateExec physical operator.
InputProcessor
is created exclusively when FlatMapGroupsWithStateExec
physical operator is requested to execute and generate a recipe for a distributed computation (as an RDD[InternalRow]) (and uses InputProcessor
in the storeUpdateFunction
while processing rows per partition with a corresponding per-partition state store).
InputProcessor
takes a single StateStore to be created. The StateStore
manages the per-group state (and is used when processing new data and timed-out state data, and in the "all rows processed" callback).
Processing New Data (Creating Iterator of New Data Processed) — processNewData
Method
processNewData
creates a grouped iterator of (of pairs of) per-group state keys and the row values from the given data iterator (dataIter
) with the grouping attributes and the output schema of the child operator (of the parent FlatMapGroupsWithStateExec
physical operator).
For every per-group state key (in the grouped iterator), processNewData
requests the StateManager (of the parent FlatMapGroupsWithStateExec
physical operator) to get the state (from the StateStore) and callFunctionAndUpdateState (with the hasTimedOut
flag off, i.e. false
).
Note
|
processNewData is used exclusively when FlatMapGroupsWithStateExec physical operator is requested to execute and generate a recipe for a distributed computation (as an RDD[InternalRow]).
|
Processing Timed-Out State Data (Creating Iterator of Timed-Out State Data) — processTimedOutState
Method
processTimedOutState(): Iterator[InternalRow]
processTimedOutState
does nothing and simply returns an empty iterator for GroupStateTimeout.NoTimeout.
With timeout enabled, processTimedOutState
gets the current timeout threshold per GroupStateTimeout:
processTimedOutState
creates an iterator of timed-out state data by requesting the StateManager for all the available state data (in the StateStore) and takes only the state data with timeout defined and below the current timeout threshold.
In the end, for every timed-out state data, processTimedOutState
callFunctionAndUpdateState (with the hasTimedOut
flag enabled).
Note
|
processTimedOutState is used exclusively when FlatMapGroupsWithStateExec physical operator is requested to execute and generate a recipe for a distributed computation (as an RDD[InternalRow]).
|
callFunctionAndUpdateState
Internal Method
callFunctionAndUpdateState(
stateData: StateData,
valueRowIter: Iterator[InternalRow],
hasTimedOut: Boolean): Iterator[InternalRow]
Note
|
When processing new data, When processing timed-out state data, |
callFunctionAndUpdateState
creates a key object by requesting the given StateData
for the UnsafeRow
of the key (keyRow) and converts it to an object (using the internal state key converter).
callFunctionAndUpdateState
creates value objects by taking every value row (from the given valueRowIter
iterator) and converts them to objects (using the internal state value converter).
callFunctionAndUpdateState
creates a new GroupStateImpl with the following:
-
The current state value (of the given
StateData
) that could possibly benull
-
The batchTimestampMs of the parent
FlatMapGroupsWithStateExec
operator (that could possibly be -1) -
The event-time watermark of the parent
FlatMapGroupsWithStateExec
operator (that could possibly be -1) -
The GroupStateTimeout of the parent
FlatMapGroupsWithStateExec
operator -
The watermarkPresent flag of the parent
FlatMapGroupsWithStateExec
operator -
The given
hasTimedOut
flag
callFunctionAndUpdateState
then executes the user-defined state function (of the parent FlatMapGroupsWithStateExec
operator) on the key object, value objects, and the newly-created GroupStateImpl
.
For every output value from the user-defined state function, callFunctionAndUpdateState
updates numOutputRows performance metric and wraps the values to an internal row (using the internal output value converter).
In the end, callFunctionAndUpdateState
returns a Iterator[InternalRow]
which calls the completion function right after rows have been processed (so the iterator is considered fully consumed).
"All Rows Processed" Callback — onIteratorCompletion
Internal Method
onIteratorCompletion: Unit
onIteratorCompletion
branches off per whether the GroupStateImpl
has been marked removed and no timeout timestamp is specified or not.
When the GroupStateImpl
has been marked removed and no timeout timestamp is specified, onIteratorCompletion
does the following:
-
Requests the StateManager (of the parent
FlatMapGroupsWithStateExec
operator) to remove the state (from the StateStore for the key row of the givenStateData
) -
Increments the numUpdatedStateRows performance metric
Otherwise, when the GroupStateImpl
has not been marked removed or the timeout timestamp is specified, onIteratorCompletion
checks whether the timeout timestamp has changed by comparing the timeout timestamps of the GroupStateImpl and the given StateData
.
(only when the GroupStateImpl
has been updated, removed or the timeout timestamp changed) onIteratorCompletion
does the following:
-
Requests the StateManager (of the parent
FlatMapGroupsWithStateExec
operator) to persist the state (in the StateStore with the key row, updated state object, and the timeout timestamp of the givenStateData
) -
Increments the numUpdatedStateRows performance metrics
Note
|
onIteratorCompletion is used exclusively when InputProcessor is requested to callFunctionAndUpdateState (right after rows have been processed)
|
Internal Properties
Name | Description |
---|---|
|
A state key converter (of type
Used exclusively when |
|
A output value converter (of type
Used exclusively when |
|
A state value converter (of type
Used exclusively when |
|