Aggregation Execution Planning Strategy for Aggregate Physical Operators

scala> :type spark

// structured query with count aggregate function
val q = spark
  .groupBy($"id" % 2 as "group")
  .agg(count("id") as "count")
val plan = q.queryExecution.optimizedPlan
scala> println(plan.numberedTreeString)
00 Aggregate [(id#0L % 2)], [(id#0L % 2) AS group#3L, count(1) AS count#8L]
01 +- Range (0, 5, step=1, splits=Some(8))

import spark.sessionState.planner.Aggregation
val physicalPlan = Aggregation.apply(plan)

// HashAggregateExec selected
scala> println(physicalPlan.head.numberedTreeString)
00 HashAggregate(keys=[(id#0L % 2)#12L], functions=[count(1)], output=[group#3L, count#8L])
01 +- HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#12L], functions=[partial_count(1)], output=[(id#0L % 2)#12L, count#14L])
02    +- PlanLater Range (0, 5, step=1, splits=Some(8))

Aggregation can select the following aggregate physical operators (in the order of preference):

Applying Aggregation Strategy to Logical Plan (Executing Aggregation) — apply Method

apply(plan: LogicalPlan): Seq[SparkPlan]
apply is part of GenericStrategy Contract to generate a collection of SparkPlans for a given logical plan.

apply requests PhysicalAggregation extractor for Aggregate logical operators and creates a single aggregate physical operator for every Aggregate logical operator found.

Internally, apply requests PhysicalAggregation to destructure a Aggregate logical operator (into a four-element tuple) and splits aggregate expressions per whether they are distinct or not (using their isDistinct flag).

apply then creates a physical operator using the following helper methods:

