spark.sessionState.planner.StatefulAggregationStrategy
StatefulAggregationStrategy Execution Planning Strategy — EventTimeWatermark and Aggregate Logical Operators
StatefulAggregationStrategy is an execution planning strategy that is used to plan streaming queries with the two logical operators:
-
EventTimeWatermark logical operator (for Dataset.withWatermark operator)
-
Aggregatelogical operator (for Dataset.groupBy and Dataset.groupByKey operators, andGROUP BYSQL clause)
|
Tip
|
Read up on Execution Planning Strategies in The Internals of Spark SQL book. |
StatefulAggregationStrategy is used exclusively when IncrementalExecution is requested to plan a streaming query.
StatefulAggregationStrategy is available using SessionState.
| Logical Operator | Physical Operator | ||
|---|---|---|---|
|
In the order of preference:
|
val counts = spark.
readStream.
format("rate").
load.
groupBy(window($"timestamp", "5 seconds") as "group").
agg(count("value") as "count").
orderBy("group")
scala> counts.explain
== Physical Plan ==
*Sort [group#6 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(group#6 ASC NULLS FIRST, 200)
+- *HashAggregate(keys=[window#13], functions=[count(value#1L)])
+- StateStoreSave [window#13], StatefulOperatorStateInfo(<unknown>,736d67c2-6daa-4c4c-9c4b-c12b15af20f4,0,0), Append, 0
+- *HashAggregate(keys=[window#13], functions=[merge_count(value#1L)])
+- StateStoreRestore [window#13], StatefulOperatorStateInfo(<unknown>,736d67c2-6daa-4c4c-9c4b-c12b15af20f4,0,0)
+- *HashAggregate(keys=[window#13], functions=[merge_count(value#1L)])
+- Exchange hashpartitioning(window#13, 200)
+- *HashAggregate(keys=[window#13], functions=[partial_count(value#1L)])
+- *Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 5000000), LongType, TimestampType)) AS window#13, value#1L]
+- *Filter isnotnull(timestamp#0)
+- StreamingRelation rate, [timestamp#0, value#1L]
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val consoleOutput = counts.
writeStream.
format("console").
option("truncate", false).
trigger(Trigger.ProcessingTime(10.seconds)).
queryName("counts").
outputMode(OutputMode.Complete). // <-- required for groupBy
start
// Eventually...
consoleOutput.stop
Selecting Aggregate Physical Operator Given Aggregate Expressions — AggUtils.planStreamingAggregation Internal Method
planStreamingAggregation(
groupingExpressions: Seq[NamedExpression],
functionsWithoutDistinct: Seq[AggregateExpression],
resultExpressions: Seq[NamedExpression],
child: SparkPlan): Seq[SparkPlan]
planStreamingAggregation takes the grouping attributes (from groupingExpressions).
|
Note
|
groupingExpressions corresponds to the grouping function in groupBy operator.
|
planStreamingAggregation creates an aggregate physical operator (called partialAggregate) with:
-
requiredChildDistributionExpressionsundefined (i.e.None) -
initialInputBufferOffsetas0 -
functionsWithoutDistinctinPartialmode -
childoperator as the inputchild
|
Note
|
|
planStreamingAggregation creates an aggregate physical operator (called partialMerged1) with:
-
requiredChildDistributionExpressionsbased on the inputgroupingExpressions -
initialInputBufferOffsetas the length ofgroupingExpressions -
functionsWithoutDistinctinPartialMergemode -
childoperator as partialAggregate aggregate physical operator created above
planStreamingAggregation creates StateStoreRestoreExec with the grouping attributes, undefined StatefulOperatorStateInfo, and partialMerged1 aggregate physical operator created above.
planStreamingAggregation creates an aggregate physical operator (called partialMerged2) with:
-
childoperator as StateStoreRestoreExec physical operator created above
|
Note
|
The only difference between partialMerged1 and partialMerged2 steps is the child physical operator. |
planStreamingAggregation creates StateStoreSaveExec with:
-
the grouping attributes based on the input
groupingExpressions -
No
stateInfo,outputModeandeventTimeWatermark -
childoperator as partialMerged2 aggregate physical operator created above
In the end, planStreamingAggregation creates the final aggregate physical operator (called finalAndCompleteAggregate) with:
-
requiredChildDistributionExpressionsbased on the inputgroupingExpressions -
initialInputBufferOffsetas the length ofgroupingExpressions -
functionsWithoutDistinctinFinalmode -
childoperator as StateStoreSaveExec physical operator created above
|
Note
|
planStreamingAggregation is used exclusively when StatefulAggregationStrategy plans a streaming aggregation.
|