scala> :type spark org.apache.spark.sql.SparkSession // structured query with count aggregate function val q = spark .range(5) .groupBy($"id" % 2 as "group") .agg(count("id") as "count") val plan = q.queryExecution.optimizedPlan scala> println(plan.numberedTreeString) 00 Aggregate [(id#0L % 2)], [(id#0L % 2) AS group#3L, count(1) AS count#8L] 01 +- Range (0, 5, step=1, splits=Some(8)) import spark.sessionState.planner.Aggregation val physicalPlan = Aggregation.apply(plan) // HashAggregateExec selected scala> println(physicalPlan.head.numberedTreeString) 00 HashAggregate(keys=[(id#0L % 2)#12L], functions=[count(1)], output=[group#3L, count#8L]) 01 +- HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#12L], functions=[partial_count(1)], output=[(id#0L % 2)#12L, count#14L]) 02 +- PlanLater Range (0, 5, step=1, splits=Some(8))
Aggregation Execution Planning Strategy for Aggregate Physical Operators
Aggregation is an execution planning strategy that SparkPlanner uses to select aggregate physical operator for Aggregate logical operator in a logical query plan.
Aggregation can select the following aggregate physical operators (in the order of preference):
Applying Aggregation Strategy to Logical Plan (Executing Aggregation) —
apply(plan: LogicalPlan): Seq[SparkPlan]
PhysicalAggregation extractor for Aggregate logical operators and creates a single aggregate physical operator for every Aggregate logical operator found.
PhysicalAggregation to destructure a Aggregate logical operator (into a four-element tuple) and splits aggregate expressions per whether they are distinct or not (using their isDistinct flag).
apply then creates a physical operator using the following helper methods:
AggUtils.planAggregateWithoutDistinct when no distinct aggregate expression is used
AggUtils.planAggregateWithOneDistinct when at least one distinct aggregate expression is used.