StatefulAggregationStrategy Execution Planning Strategy for EventTimeWatermark and Aggregate Logical Operators

StatefulAggregationStrategy is an execution planning strategy (i.e. Strategy) that IncrementalExecution uses to plan EventTimeWatermark and Aggregate logical operators in streaming structured queries (Datasets).

Note

EventTimeWatermark logical operator is the result of withWatermark operator.

Note
Aggregate logical operator represents groupBy and groupByKey aggregations (and SQL’s GROUP BY clause).

StatefulAggregationStrategy is available using SessionState.

spark.sessionState.planner.StatefulAggregationStrategy
Table 1. StatefulAggregationStrategy’s Logical to Physical Operator Conversions
Logical Operator Physical Operator

EventTimeWatermark

EventTimeWatermarkExec

Aggregate

In the order of preference:

  1. HashAggregateExec

  2. ObjectHashAggregateExec

  3. SortAggregateExec

Tip
Read Aggregation Execution Planning Strategy for Aggregate Physical Operators in Mastering Apache Spark 2 gitbook.
val counts = spark.
  readStream.
  format("rate").
  load.
  groupBy(window($"timestamp", "5 seconds") as "group").
  agg(count("value") as "count").
  orderBy("group")
scala> counts.explain
== Physical Plan ==
*Sort [group#6 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(group#6 ASC NULLS FIRST, 200)
   +- *HashAggregate(keys=[window#13], functions=[count(value#1L)])
      +- StateStoreSave [window#13], StatefulOperatorStateInfo(<unknown>,736d67c2-6daa-4c4c-9c4b-c12b15af20f4,0,0), Append, 0
         +- *HashAggregate(keys=[window#13], functions=[merge_count(value#1L)])
            +- StateStoreRestore [window#13], StatefulOperatorStateInfo(<unknown>,736d67c2-6daa-4c4c-9c4b-c12b15af20f4,0,0)
               +- *HashAggregate(keys=[window#13], functions=[merge_count(value#1L)])
                  +- Exchange hashpartitioning(window#13, 200)
                     +- *HashAggregate(keys=[window#13], functions=[partial_count(value#1L)])
                        +- *Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 5000000), LongType, TimestampType)) AS window#13, value#1L]
                           +- *Filter isnotnull(timestamp#0)
                              +- StreamingRelation rate, [timestamp#0, value#1L]

import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val consoleOutput = counts.
  writeStream.
  format("console").
  option("truncate", false).
  trigger(Trigger.ProcessingTime(10.seconds)).
  queryName("counts").
  outputMode(OutputMode.Complete).  // <-- required for groupBy
  start

// Eventually...
consoleOutput.stop

Selecting Aggregate Physical Operator Given Aggregate Expressions — AggUtils.planStreamingAggregation Internal Method

planStreamingAggregation(
  groupingExpressions: Seq[NamedExpression],
  functionsWithoutDistinct: Seq[AggregateExpression],
  resultExpressions: Seq[NamedExpression],
  child: SparkPlan): Seq[SparkPlan]

planStreamingAggregation takes the grouping attributes (from groupingExpressions).

Note
groupingExpressions corresponds to the grouping function in groupBy operator.

planStreamingAggregation creates an aggregate physical operator (called partialAggregate) with:

  • requiredChildDistributionExpressions undefined (i.e. None)

  • initialInputBufferOffset as 0

  • functionsWithoutDistinct in Partial mode

  • child operator as the input child

Note

planStreamingAggregation creates one of the following aggregate physical operators (in the order of preference):

  1. HashAggregateExec

  2. ObjectHashAggregateExec

  3. SortAggregateExec

planStreamingAggregation uses AggUtils.createAggregate method to select an aggregate physical operator that you can read about in Selecting Aggregate Physical Operator Given Aggregate Expressions — AggUtils.createAggregate Internal Method in Mastering Apache Spark 2 gitbook.

planStreamingAggregation creates an aggregate physical operator (called partialMerged1) with:

  • requiredChildDistributionExpressions based on the input groupingExpressions

  • initialInputBufferOffset as the length of groupingExpressions

  • functionsWithoutDistinct in PartialMerge mode

  • child operator as partialAggregate aggregate physical operator created above

planStreamingAggregation creates StateStoreRestoreExec with the grouping attributes, undefined StatefulOperatorStateInfo, and partialMerged1 aggregate physical operator created above.

planStreamingAggregation creates an aggregate physical operator (called partialMerged2) with:

Note
The only difference between partialMerged1 and partialMerged2 steps is the child physical operator.

planStreamingAggregation creates StateStoreSaveExec with:

  • the grouping attributes based on the input groupingExpressions

  • No stateInfo, outputMode and eventTimeWatermark

  • child operator as partialMerged2 aggregate physical operator created above

In the end, planStreamingAggregation creates the final aggregate physical operator (called finalAndCompleteAggregate) with:

  • requiredChildDistributionExpressions based on the input groupingExpressions

  • initialInputBufferOffset as the length of groupingExpressions

  • functionsWithoutDistinct in Final mode

  • child operator as StateStoreSaveExec physical operator created above

Note
planStreamingAggregation is used exclusively when StatefulAggregationStrategy plans a streaming aggregation.

results matching ""

    No results matching ""