HashAggregateExec Aggregate Physical Operator for Hash-Based Aggregation

HashAggregateExec is a unary physical operator for hash-based aggregation that is created (indirectly through AggUtils.createAggregate) when:

  • Aggregation execution planning strategy selects the aggregate physical operator for an Aggregate logical operator

  • Structured Streaming’s StatefulAggregationStrategy strategy creates plan for streaming EventTimeWatermark or Aggregate logical operators

HashAggregateExec is the preferred aggregate physical operator for Aggregation execution planning strategy (over ObjectHashAggregateExec and SortAggregateExec).

HashAggregateExec supports code generation (aka codegen).

// HashAggregateExec selected due to:
// sum uses mutable types for aggregate expression
// just a single id column reference of LongType data type
val q = spark.range(10).
  groupBy('id % 2 as "group").
  agg(sum("id") as "sum")
scala> q.explain
== Physical Plan ==
*HashAggregate(keys=[(id#57L % 2)#69L], functions=[sum(id#57L)])
+- Exchange hashpartitioning((id#57L % 2)#69L, 200)
   +- *HashAggregate(keys=[(id#57L % 2) AS (id#57L % 2)#69L], functions=[partial_sum(id#57L)])
      +- *Range (0, 10, step=1, splits=8)

scala> println(q.queryExecution.sparkPlan.numberedTreeString)
00 HashAggregate(keys=[(id#57L % 2)#72L], functions=[sum(id#57L)], output=[group#60L, sum#64L])
01 +- HashAggregate(keys=[(id#57L % 2) AS (id#57L % 2)#72L], functions=[partial_sum(id#57L)], output=[(id#57L % 2)#72L, sum#74L])
02    +- Range (0, 10, step=1, splits=8)

// Going low level...watch your steps :)

import q.queryExecution.optimizedPlan
import org.apache.spark.sql.catalyst.plans.logical.Aggregate
val aggLog = optimizedPlan.asInstanceOf[Aggregate]
import org.apache.spark.sql.catalyst.planning.PhysicalAggregation
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
val aggregateExpressions: Seq[AggregateExpression] = PhysicalAggregation.unapply(aggLog).get._2
val aggregateBufferAttributes = aggregateExpressions.
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
// that's the exact reason why HashAggregateExec was selected
// Aggregation execution planning strategy prefers HashAggregateExec
scala> val useHash = HashAggregateExec.supportsAggregate(aggregateBufferAttributes)
useHash: Boolean = true

val execPlan = q.queryExecution.sparkPlan
val hashAggExec = execPlan.asInstanceOf[HashAggregateExec]
scala> println(execPlan.numberedTreeString)
00 HashAggregate(keys=[(id#39L % 2)#50L], functions=[sum(id#39L)], output=[group#42L, sum#46L])
01 +- HashAggregate(keys=[(id#39L % 2) AS (id#39L % 2)#50L], functions=[partial_sum(id#39L)], output=[(id#39L % 2)#50L, sum#52L])
02    +- Range (0, 10, step=1, splits=8)

val hashAggExecRDD = hashAggExec.execute // <-- calls doExecute
scala> println(hashAggExecRDD.toDebugString)
(8) MapPartitionsRDD[14] at execute at <console>:35 []
 |  MapPartitionsRDD[13] at execute at <console>:35 []
 |  MapPartitionsRDD[12] at execute at <console>:35 []
 |  ParallelCollectionRDD[11] at execute at <console>:35 []
Table 1. HashAggregateExec’s SQLMetrics (in alphabetical order)
Name Description


aggregate time


number of output rows


peak memory


spill size

spark sql HashAggregateExec webui details for query.png
Figure 1. HashAggregateExec in web UI (Details for Query)
Table 2. HashAggregateExec’s Properties (in alphabetical order)
Name Description


Collection of AttributeReference references of the aggregate functions of the input AggregateExpressions


Output schema for the input NamedExpressions

requiredChildDistribution varies per the input required child distribution expressions.

Table 3. HashAggregateExec’s Required Child Output Distributions
requiredChildDistributionExpressions Distribution

Defined, but empty




Undefined (None)



requiredChildDistributionExpressions is exactly requiredChildDistributionExpressions from AggUtils.createAggregate and is undefined by default.

(No distinct in aggregation) requiredChildDistributionExpressions is undefined when HashAggregateExec is created for partial aggregations (i.e. mode is Partial for aggregate expressions).

requiredChildDistributionExpressions is defined, but could possibly be empty, when HashAggregateExec is created for final aggregations (i.e. mode is Final for aggregate expressions).

(one distinct in aggregation) requiredChildDistributionExpressions is undefined when HashAggregateExec is created for partial aggregations (i.e. mode is Partial for aggregate expressions) with one distinct in aggregation.

requiredChildDistributionExpressions is defined, but could possibly be empty, when HashAggregateExec is created for partial merge aggregations (i.e. mode is PartialMerge for aggregate expressions).

FIXME for the following two cases in aggregation with one distinct.

The prefix for variable names for HashAggregateExec operators in CodegenSupport-generated code is agg.

testFallbackStartsAt Internal Value


supportsAggregate Method

supportsAggregate(aggregateBufferAttributes: Seq[Attribute]): Boolean

supportsAggregate first builds the schema of the aggregation buffer (from the input aggregateBufferAttributes attributes) and checks if UnsafeFixedWidthAggregationMap supports it (i.e. the schema uses mutable field data types only that have fixed length and can be mutated in place in an UnsafeRow).

supportsAggregate is used exclusively when AggUtils.createAggregate selects an aggregate physical operator given aggregate expressions.

Creating HashAggregateExec Instance

HashAggregateExec takes the following when created:

Executing HashAggregateExec — doExecute Method

doExecute(): RDD[InternalRow]

doExecute executes the input child SparkPlan (to produce InternalRow objects) and applies calculation over partitions (using RDD.mapPartitions).

RDD.mapPartitions does not preserve partitioning and neither does HashAggregateExec when executed.

In the mapPartitions block, doExecute creates one of the following:

doExecute is a part of SparkPlan Contract to produce the result of a structured query as an RDD of InternalRow objects.

doProduce Method

doProduce(ctx: CodegenContext): String

doProduce executes doProduceWithoutKeys when no groupingExpressions were specified for the HashAggregateExec or doProduceWithKeys otherwise.

doProduce is a part of CodegenSupport Contract.

results matching ""

    No results matching ""