Aggregation Execution Planning Strategy for Aggregate Physical Operators

import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
// structured query with count aggregate function
val q = spark.range(5).
  groupBy($"id" % 2 as "group").
  agg(count("id") as "count")
import q.queryExecution.optimizedPlan
scala> println(optimizedPlan.numberedTreeString)
00 Aggregate [(id#0L % 2)], [(id#0L % 2) AS group#3L, count(1) AS count#8L]
01 +- Range (0, 5, step=1, splits=Some(8))

import spark.sessionState.planner.Aggregation
val physicalPlan = Aggregation.apply(optimizedPlan)

// HashAggregateExec selected
scala> println(physicalPlan.head.numberedTreeString)
00 HashAggregate(keys=[(id#0L % 2)#12L], functions=[count(1)], output=[group#3L, count#8L])
01 +- HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#12L], functions=[partial_count(1)], output=[(id#0L % 2)#12L, count#14L])
02    +- PlanLater Range (0, 5, step=1, splits=Some(8))

Aggregation can select the following aggregate physical operators (in order of preference):

AggUtils.planAggregateWithOneDistinct Method


Executing Planning Strategy — apply Method

apply(plan: LogicalPlan): Seq[SparkPlan]

apply finds Aggregate logical operators and creates a single aggregate physical operator for every Aggregate logical operator.

Internally, apply destructures a Aggregate logical operator (into a four-element tuple) and splits aggregate expressions per whether they are distinct or not (using their isDistinct flag).

apply then creates a physical operator using the following helper methods:

apply is a part of GenericStrategy Contract to execute a planning strategy.

Selecting Aggregate Physical Operator Given Aggregate Expressions — AggUtils.createAggregate Internal Method

  requiredChildDistributionExpressions: Option[Seq[Expression]] = None,
  groupingExpressions: Seq[NamedExpression] = Nil,
  aggregateExpressions: Seq[AggregateExpression] = Nil,
  aggregateAttributes: Seq[Attribute] = Nil,
  initialInputBufferOffset: Int = 0,
  resultExpressions: Seq[NamedExpression] = Nil,
  child: SparkPlan): SparkPlan

Internally, createAggregate selects and creates a physical operator given the input aggregateExpressions aggregate expressions.

Table 1. createAggregate’s Aggregate Physical Operator Selection Criteria (in execution order)
Aggregate Physical Operator Selection Criteria


HashAggregateExec supports all aggBufferAttributes of the input aggregateExpressions aggregate expressions.


  1. spark.sql.execution.useObjectHashAggregateExec internal flag enabled (it is by default)

  2. ObjectHashAggregateExec supports the input aggregateExpressions aggregate expressions.


When all the above requirements could not be met.


createAggregate is used in:

Creating Physical Plan with Two Aggregate Physical Operators for Partial and Final Aggregations — AggUtils.planAggregateWithoutDistinct Method

  groupingExpressions: Seq[NamedExpression],
  aggregateExpressions: Seq[AggregateExpression],
  resultExpressions: Seq[NamedExpression],
  child: SparkPlan): Seq[SparkPlan]

planAggregateWithoutDistinct is a two-step physical operator generator.

planAggregateWithoutDistinct first creates an aggregate physical operator with aggregateExpressions in Partial mode (for partial aggregations).

requiredChildDistributionExpressions for the aggregate physical operator for partial aggregation "stage" is empty.

In the end, planAggregateWithoutDistinct creates another aggregate physical operator (of the same type as before), but aggregateExpressions are now in Final mode (for final aggregations). The aggregate physical operator becomes the parent of the first aggregate operator.

requiredChildDistributionExpressions for the parent aggregate physical operator for final aggregation "stage" are the attributes of groupingExpressions.
planAggregateWithoutDistinct is used exclusively when Aggregation execution planning strategy is executed (with no AggregateExpressions being distinct).

Destructuring Aggregate Logical Operator — PhysicalAggregation.unapply Method

unapply(a: Any): Option[ReturnType]

unapply destructures the input a Aggregate logical operator into a four-element ReturnType.


ReturnType is a type alias (aka type synonym) for a four-element tuple with grouping, aggregate and result Catalyst expressions, and child logical operator.

type ReturnType =
  (Seq[NamedExpression], Seq[AggregateExpression], Seq[NamedExpression], LogicalPlan)
PhysicalAggregation is a Scala extractor object with a single unapply method.

