log4j.logger.org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec=ALL
FlatMapGroupsWithStateExec Unary Physical Operator
FlatMapGroupsWithStateExec
is a unary physical operator that represents FlatMapGroupsWithState logical operator at execution time.
Note
|
A unary physical operator ( Read up on UnaryExecNode (and physical operators in general) in The Internals of Spark SQL book. |
Note
|
FlatMapGroupsWithState unary logical operator represents KeyValueGroupedDataset.mapGroupsWithState and KeyValueGroupedDataset.flatMapGroupsWithState operators in a logical query plan. |
FlatMapGroupsWithStateExec
is created exclusively when FlatMapGroupsWithStateStrategy execution planning strategy is requested to plan a FlatMapGroupsWithState logical operator for execution.
FlatMapGroupsWithStateExec
is an ObjectProducerExec
physical operator and so produces a single output object.
Tip
|
Read up on ObjectProducerExec — Physical Operators With Single Object Output in The Internals of Spark SQL book. |
Note
|
FlatMapGroupsWithStateExec is given an OutputMode when created, but it does not seem to be used at all. Check out the question What’s the purpose of OutputMode in flatMapGroupsWithState? How/where is it used? on StackOverflow.
|
Tip
|
Enable Add the following line to Refer to Logging. |
Creating FlatMapGroupsWithStateExec Instance
FlatMapGroupsWithStateExec
takes the following to be created:
-
User-defined state function that is applied to every group (of type
(Any, Iterator[Any], LogicalGroupState[Any]) ⇒ Iterator[Any]
) -
Grouping attributes (as used for grouping in KeyValueGroupedDataset for
mapGroupsWithState
orflatMapGroupsWithState
operators) -
Output object attribute (that is the reference to the single object field this operator outputs)
FlatMapGroupsWithStateExec
initializes the internal properties.
Performance Metrics (SQLMetrics)
FlatMapGroupsWithStateExec
uses the performance metrics of StateStoreWriter.
FlatMapGroupsWithStateExec as StateStoreWriter
FlatMapGroupsWithStateExec
is a stateful physical operator that can write to a state store(and MicroBatchExecution
requests whether to run another batch or not based on the GroupStateTimeout).
FlatMapGroupsWithStateExec
uses the GroupStateTimeout (and possibly the updated metadata) when asked whether to run another batch or not (when MicroBatchExecution
is requested to construct the next streaming micro-batch when requested to run the activated streaming query).
FlatMapGroupsWithStateExec with Streaming Event-Time Watermark Support (WatermarkSupport)
FlatMapGroupsWithStateExec
is a physical operator that supports streaming event-time watermark.
FlatMapGroupsWithStateExec
is given the optional event time watermark when created.
The event-time watermark is initially undefined (None
) when planned to for execution (in FlatMapGroupsWithStateStrategy execution planning strategy).
Note
|
|
The event-time watermark (with the StatefulOperatorStateInfo and the batchTimestampMs) is only defined to the current event-time watermark of the given OffsetSeqMetadata when IncrementalExecution
query execution pipeline is requested to apply the state preparation rule (as part of the preparations rules).
Note
|
The preparations rules are executed (applied to a physical query plan) at the Read up on Structured Query Execution Pipeline in The Internals of Spark SQL book. |
IncrementalExecution
is used as the lastExecution of the available streaming query execution engines. It is created in the queryPlanning phase (of the MicroBatchExecution and ContinuousExecution execution engines) based on the current OffsetSeqMetadata.
Note
|
The optional event-time watermark can only be defined when the state preparation rule is executed which is at the executedPlan phase of Structured Query Execution Pipeline which is also part of the queryPlanning phase.
|
FlatMapGroupsWithStateExec and StateManager — stateManager
Property
stateManager: StateManager
While being created, FlatMapGroupsWithStateExec
creates a StateManager (with the state encoder and the isTimeoutEnabled flag).
A StateManager
is created per state format version that is given while creating a FlatMapGroupsWithStateExec
(to choose between the available implementations).
The state format version is controlled by spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion internal configuration property (default: 2
).
Note
|
StateManagerImplV2 is the default StateManager .
|
The StateManager
is used exclusively when FlatMapGroupsWithStateExec
physical operator is executed (to generate a recipe for a distributed computation as an RDD[InternalRow]
) for the following:
-
State schema (for the value schema of a StateStoreRDD)
-
State data for a key in a StateStore while processing new data
-
All state data (for all keys) in a StateStore while processing timed-out state data
-
Removing the state for a key from a StateStore when all rows have been processed
-
Persisting the state for a key in a StateStore when all rows have been processed
keyExpressions
Method
keyExpressions: Seq[Attribute]
Note
|
keyExpressions is part of the WatermarkSupport Contract to…FIXME.
|
keyExpressions
simply returns the grouping attributes.
Executing Physical Operator (Generating RDD[InternalRow]) — doExecute
Method
doExecute(): RDD[InternalRow]
Note
|
doExecute is part of SparkPlan Contract to generate the runtime representation of an physical operator as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow] ).
|
doExecute
first initializes the metrics (which happens on the driver).
doExecute
then requests the child physical operator to execute and generate an RDD[InternalRow]
.
doExecute
uses StateStoreOps to create a StateStoreRDD with a storeUpdateFunction
that does the following (for a partition):
-
Creates an InputProcessor for a given StateStore
-
(only when the GroupStateTimeout is EventTimeTimeout) Filters out late data based on the event-time watermark, i.e. rows from a given
Iterator[InternalRow]
that are older than the event-time watermark are excluded from the steps that follow -
Requests the
InputProcessor
to create an iterator of a new data processed from the (possibly filtered) iterator -
Requests the
InputProcessor
to create an iterator of a timed-out state data -
Creates an iterator by concatenating the above iterators (with the new data processed first)
-
In the end, creates a
CompletionIterator
that executes a completion function (completionFunction
) after it has successfully iterated through all the elements (i.e. when a client has consumed all the rows). The completion method requests the givenStateStore
to commit changes followed by setting the store-specific metrics.
Checking Out Whether Last Batch Execution Requires Another Non-Data Batch or Not — shouldRunAnotherBatch
Method
shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean
Note
|
shouldRunAnotherBatch is part of the StateStoreWriter Contract to indicate whether MicroBatchExecution should run another non-data batch (based on the updated OffsetSeqMetadata with the current event-time watermark and the batch timestamp).
|
shouldRunAnotherBatch
uses the GroupStateTimeout as follows:
-
With EventTimeTimeout,
shouldRunAnotherBatch
is positive (true
) only when the event-time watermark is defined and is older (below) the event-time watermark of the givenOffsetSeqMetadata
-
With NoTimeout (and other GroupStateTimeouts if there were any),
shouldRunAnotherBatch
is always negative (false
) -
With ProcessingTimeTimeout,
shouldRunAnotherBatch
is always positive (true
)
Internal Properties
Name | Description |
---|---|
|
Flag that says whether the GroupStateTimeout is not NoTimeout Used when:
|
|
|
|
|
|
|
|
|
|
Flag that says whether the child physical operator has a watermark attribute (among the output attributes). Used exclusively when |