TungstenAggregationIterator — Iterator of UnsafeRows for HashAggregateExec Physical Operator

TungstenAggregationIterator is a AggregationIterator that the HashAggregateExec aggregate physical operator uses when executed (to process UnsafeRows per partition and calculate aggregations).

TungstenAggregationIterator prefers hash-based aggregation (before switching to sort-based aggregation).

val q = spark.range(10).
  groupBy('id % 2 as "group").
  agg(sum("id") as "sum")
val execPlan = q.queryExecution.sparkPlan
scala> println(execPlan.numberedTreeString)
00 HashAggregate(keys=[(id#0L % 2)#11L], functions=[sum(id#0L)], output=[group#3L, sum#7L])
01 +- HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#11L], functions=[partial_sum(id#0L)], output=[(id#0L % 2)#11L, sum#13L])
02    +- Range (0, 10, step=1, splits=8)

import org.apache.spark.sql.execution.aggregate.HashAggregateExec
val hashAggExec = execPlan.asInstanceOf[HashAggregateExec]
val hashAggExecRDD = hashAggExec.execute

// MapPartitionsRDD is in private[spark] scope
// Use :paste -raw for the following helper object
package org.apache.spark
object AccessPrivateSpark {
  import org.apache.spark.rdd.RDD
  def mapPartitionsRDD[T](hashAggExecRDD: RDD[T]) = {
    import org.apache.spark.rdd.MapPartitionsRDD
    hashAggExecRDD.asInstanceOf[MapPartitionsRDD[_, _]]
// END :paste -raw

import org.apache.spark.AccessPrivateSpark
val mpRDD = AccessPrivateSpark.mapPartitionsRDD(hashAggExecRDD)
val f = mpRDD.iterator(_, _)

import org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator
// FIXME How to show that TungstenAggregationIterator is used?

When created, TungstenAggregationIterator gets SQL metrics from the HashAggregateExec aggregate physical operator being executed, i.e. numOutputRows, peakMemory, spillSize and avgHashProbe metrics.

The metrics are then displayed as part of HashAggregateExec aggregate physical operator (e.g. in web UI in Details for Query).

spark sql HashAggregateExec webui details for query.png
Figure 1. HashAggregateExec in web UI (Details for Query)
Table 1. TungstenAggregationIterator’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description


KVIterator[UnsafeRow, UnsafeRow]

Used when…​FIXME


UnsafeFixedWidthAggregationMap with the following:

Used when TungstenAggregationIterator is requested for the next UnsafeRow, to outputForEmptyGroupingKeyWithoutInput, processInputs, to initialize the aggregationBufferMapIterator and every time a partition has been processed.


UnsafeRow that is the aggregation buffer containing initial buffer values.

Used when…​FIXME


UnsafeKVExternalSorter used for sort-based aggregation


Flag to indicate whether TungstenAggregationIterator uses sort-based aggregation (not hash-based aggregation).

sortBased flag is disabled (false) by default.

Enabled (true) when TungstenAggregationIterator is requested to switch to sort-based aggregation.

Used when…​FIXME

processInputs Internal Method

processInputs(fallbackStartsAt: (Int, Int)): Unit


processInputs is used exclusively when TungstenAggregationIterator is created (and sets the internal flags to indicate whether to use a hash-based aggregation or, in the worst case, a sort-based aggregation when there is not enough memory for groups and their buffers).

Switching to Sort-Based Aggregation (From Preferred Hash-Based Aggregation) — switchToSortBasedAggregation Internal Method

switchToSortBasedAggregation(): Unit


switchToSortBasedAggregation is used exclusively when TungstenAggregationIterator is requested to processInputs (and the externalSorter is used).

Getting Next UnsafeRow — next Method

next(): UnsafeRow
next is part of Scala’s scala.collection.Iterator interface that returns the next element and discards it from the iterator.


hasNext Method

hasNext: Boolean
hasNext is part of Scala’s scala.collection.Iterator interface that tests whether this iterator can provide another element.


Creating TungstenAggregationIterator Instance

TungstenAggregationIterator takes the following when created:

The SQL metrics (numOutputRows, peakMemory, spillSize and avgHashProbe) belong to the HashAggregateExec physical operator that created the TungstenAggregationIterator.

TungstenAggregationIterator initializes the internal registries and counters.

TungstenAggregationIterator starts processing input rows and pre-loads the first key-value pair from the UnsafeFixedWidthAggregationMap if did not switch to sort-based aggregation.

generateResultProjection Method

generateResultProjection(): (UnsafeRow, InternalRow) => UnsafeRow
generateResultProjection is part of the AggregationIterator Contract to…​FIXME.


Creating UnsafeRow — outputForEmptyGroupingKeyWithoutInput Method

outputForEmptyGroupingKeyWithoutInput(): UnsafeRow


outputForEmptyGroupingKeyWithoutInput is used when…​FIXME


TungstenAggregationIterator registers a TaskCompletionListener that is executed on task completion (for every task that processes a partition).

When executed (once per partition), the TaskCompletionListener updates the following metrics:

results matching ""

    No results matching ""