HashAggregateExec Aggregate Physical Operator for Hash-Based Aggregation

HashAggregateExec is a unary physical operator for hash-based aggregation that is created (indirectly through AggUtils.createAggregate) when:

  • Aggregation execution planning strategy selects the aggregate physical operator for an Aggregate logical operator

  • Structured Streaming’s StatefulAggregationStrategy strategy creates plan for streaming EventTimeWatermark or Aggregate logical operators

Note
HashAggregateExec is the preferred aggregate physical operator for Aggregation execution planning strategy (over ObjectHashAggregateExec and SortAggregateExec).

HashAggregateExec supports code generation (aka codegen).

val q = spark.range(10).
  groupBy('id % 2 as "group").
  agg(sum("id") as "sum")

// HashAggregateExec selected due to:
// 1. sum uses mutable types for aggregate expression
// 2. just a single id column reference of LongType data type
scala> q.explain
== Physical Plan ==
*HashAggregate(keys=[(id#0L % 2)#12L], functions=[sum(id#0L)])
+- Exchange hashpartitioning((id#0L % 2)#12L, 200)
   +- *HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#12L], functions=[partial_sum(id#0L)])
      +- *Range (0, 10, step=1, splits=8)

val execPlan = q.queryExecution.sparkPlan
scala> println(execPlan.numberedTreeString)
00 HashAggregate(keys=[(id#0L % 2)#15L], functions=[sum(id#0L)], output=[group#3L, sum#7L])
01 +- HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#15L], functions=[partial_sum(id#0L)], output=[(id#0L % 2)#15L, sum#17L])
02    +- Range (0, 10, step=1, splits=8)

// Going low level...watch your steps :)

import q.queryExecution.optimizedPlan
import org.apache.spark.sql.catalyst.plans.logical.Aggregate
val aggLog = optimizedPlan.asInstanceOf[Aggregate]
import org.apache.spark.sql.catalyst.planning.PhysicalAggregation
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
val aggregateExpressions: Seq[AggregateExpression] = PhysicalAggregation.unapply(aggLog).get._2
val aggregateBufferAttributes = aggregateExpressions.
 flatMap(_.aggregateFunction.aggBufferAttributes)
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
// that's the exact reason why HashAggregateExec was selected
// Aggregation execution planning strategy prefers HashAggregateExec
scala> val useHash = HashAggregateExec.supportsAggregate(aggregateBufferAttributes)
useHash: Boolean = true

val hashAggExec = execPlan.asInstanceOf[HashAggregateExec]
scala> println(execPlan.numberedTreeString)
00 HashAggregate(keys=[(id#0L % 2)#15L], functions=[sum(id#0L)], output=[group#3L, sum#7L])
01 +- HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#15L], functions=[partial_sum(id#0L)], output=[(id#0L % 2)#15L, sum#17L])
02    +- Range (0, 10, step=1, splits=8)

val hashAggExecRDD = hashAggExec.execute // <-- calls doExecute
scala> println(hashAggExecRDD.toDebugString)
(8) MapPartitionsRDD[3] at execute at <console>:30 []
 |  MapPartitionsRDD[2] at execute at <console>:30 []
 |  MapPartitionsRDD[1] at execute at <console>:30 []
 |  ParallelCollectionRDD[0] at execute at <console>:30 []
Table 1. HashAggregateExec’s Performance Metrics (in alphabetical order)
Key Name (in web UI) Description

aggTime

aggregate time

avgHashProbe

avg hash probe

Average hash map probe per lookup (i.e. numProbes / numKeyLookups)

Note
numProbes and numKeyLookups are used in BytesToBytesMap append-only hash map for the number of iteration to look up a single key and the number of all the lookups in total, respectively.

numOutputRows

number of output rows

Number of groups (per partition) that (depending on the number of partitions and the side of ShuffleExchange operator) is the number of groups

  • 0 for no input with a grouping expression, e.g. spark.range(0).groupBy($"id").count.show

  • 1 for no grouping expression and no input, e.g. spark.range(0).groupBy().count.show

Tip
Use different number of elements and partitions in range operator to observe the difference in numOutputRows metric, e.g.
spark.
  range(0, 10, 1, numPartitions = 1).
  groupBy($"id" % 5 as "gid").
  count.
  show

spark.
  range(0, 10, 1, numPartitions = 5).
  groupBy($"id" % 5 as "gid").
  count.
  show

peakMemory

peak memory

spillSize

spill size

spark sql HashAggregateExec webui details for query.png
Figure 1. HashAggregateExec in web UI (Details for Query)
Table 2. HashAggregateExec’s Properties (in alphabetical order)
Name Description

aggregateBufferAttributes

Collection of AttributeReference references of the aggregate functions of the input AggregateExpressions

output

Output schema for the input NamedExpressions

requiredChildDistribution varies per the input required child distribution expressions.

Table 3. HashAggregateExec’s Required Child Output Distributions
requiredChildDistributionExpressions Distribution

Defined, but empty

AllTuples

Non-empty

ClusteredDistribution(exprs)

Undefined (None)

UnspecifiedDistribution

Note

requiredChildDistributionExpressions is exactly requiredChildDistributionExpressions from AggUtils.createAggregate and is undefined by default.


(No distinct in aggregation) requiredChildDistributionExpressions is undefined when HashAggregateExec is created for partial aggregations (i.e. mode is Partial for aggregate expressions).

requiredChildDistributionExpressions is defined, but could possibly be empty, when HashAggregateExec is created for final aggregations (i.e. mode is Final for aggregate expressions).


(one distinct in aggregation) requiredChildDistributionExpressions is undefined when HashAggregateExec is created for partial aggregations (i.e. mode is Partial for aggregate expressions) with one distinct in aggregation.

requiredChildDistributionExpressions is defined, but could possibly be empty, when HashAggregateExec is created for partial merge aggregations (i.e. mode is PartialMerge for aggregate expressions).

FIXME for the following two cases in aggregation with one distinct.

Note
The prefix for variable names for HashAggregateExec operators in CodegenSupport-generated code is agg.
Table 4. HashAggregateExec’s Internal Registries and Counters (in alphabetical order)
Name Description

testFallbackStartsAt

Optional pair of numbers for controlled fall-back to a sort-based aggregation when the hash-based approach is unable to acquire enough memory.

Note

HashAggregateExec uses TungstenAggregationIterator that can (theoretically) switch to a sort-based aggregation when the hash-based approach is unable to acquire enough memory.

Search logs for the following INFO message to know whether the switch has happened.

INFO TungstenAggregationIterator: falling back to sort based aggregation.

supportsAggregate Method

supportsAggregate(aggregateBufferAttributes: Seq[Attribute]): Boolean

supportsAggregate first builds the schema of the aggregation buffer (from the input aggregateBufferAttributes attributes) and checks if UnsafeFixedWidthAggregationMap supports it (i.e. the schema uses mutable field data types only that have fixed length and can be mutated in place in an UnsafeRow).

Note
supportsAggregate is used exclusively when AggUtils.createAggregate selects an aggregate physical operator given aggregate expressions.

Executing HashAggregateExec — doExecute Method

doExecute(): RDD[InternalRow]

doExecute executes the input child SparkPlan (to produce InternalRow objects) and applies calculation over partitions (using RDD.mapPartitions).

Important
RDD.mapPartitions does not preserve partitioning and neither does HashAggregateExec when executed.

In the mapPartitions block, doExecute creates one of the following:

Note
doExecute is a part of SparkPlan Contract to produce the result of a structured query as an RDD of InternalRow objects.

doProduce Method

doProduce(ctx: CodegenContext): String

doProduce executes doProduceWithoutKeys when no groupingExpressions were specified for the HashAggregateExec or doProduceWithKeys otherwise.

Note
doProduce is a part of CodegenSupport Contract.

Creating HashAggregateExec Instance

HashAggregateExec takes the following when created:

HashAggregateExec initializes the internal registries and counters.

results matching ""

    No results matching ""