TungstenAggregationIterator — Iterator of UnsafeRows for HashAggregateExec Physical Operator

TungstenAggregationIterator is a AggregationIterator that the HashAggregateExec aggregate physical operator uses when executed (to process UnsafeRows per partition and calculate aggregations).

TungstenAggregationIterator prefers hash-based aggregation (before switching to sort-based aggregation).

val q = spark.range(10).
  groupBy('id % 2 as "group").
  agg(sum("id") as "sum")
val execPlan = q.queryExecution.sparkPlan
scala> println(execPlan.numberedTreeString)
00 HashAggregate(keys=[(id#0L % 2)#11L], functions=[sum(id#0L)], output=[group#3L, sum#7L])
01 +- HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#11L], functions=[partial_sum(id#0L)], output=[(id#0L % 2)#11L, sum#13L])
02    +- Range (0, 10, step=1, splits=8)

import org.apache.spark.sql.execution.aggregate.HashAggregateExec
val hashAggExec = execPlan.asInstanceOf[HashAggregateExec]
val hashAggExecRDD = hashAggExec.execute

// MapPartitionsRDD is in private[spark] scope
// Use :paste -raw for the following helper object
package org.apache.spark
object AccessPrivateSpark {
  import org.apache.spark.rdd.RDD
  def mapPartitionsRDD[T](hashAggExecRDD: RDD[T]) = {
    import org.apache.spark.rdd.MapPartitionsRDD
    hashAggExecRDD.asInstanceOf[MapPartitionsRDD[_, _]]
  }
}
// END :paste -raw

import org.apache.spark.AccessPrivateSpark
val mpRDD = AccessPrivateSpark.mapPartitionsRDD(hashAggExecRDD)
val f = mpRDD.iterator(_, _)

import org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator
// FIXME How to show that TungstenAggregationIterator is used?

When created, TungstenAggregationIterator gets SQL metrics from the HashAggregateExec aggregate physical operator being executed, i.e. numOutputRows, peakMemory, spillSize and avgHashProbe metrics.

The metrics are then displayed as part of HashAggregateExec aggregate physical operator (e.g. in web UI in Details for Query).

spark sql HashAggregateExec webui details for query.png
Figure 1. HashAggregateExec in web UI (Details for Query)
Table 1. TungstenAggregationIterator’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

aggregationBufferMapIterator

KVIterator[UnsafeRow, UnsafeRow]

Used when…​FIXME

hashMap

UnsafeFixedWidthAggregationMap with the following:

Used when TungstenAggregationIterator is requested for the next UnsafeRow, to outputForEmptyGroupingKeyWithoutInput, processInputs, to initialize the aggregationBufferMapIterator and every time a partition has been processed.

initialAggregationBuffer

UnsafeRow that is the aggregation buffer containing initial buffer values.

Used when…​FIXME

externalSorter

UnsafeKVExternalSorter used for sort-based aggregation

sortBased

Flag to indicate whether TungstenAggregationIterator uses sort-based aggregation (not hash-based aggregation).

sortBased flag is disabled (false) by default.

Enabled (true) when TungstenAggregationIterator is requested to switch to sort-based aggregation.

Used when…​FIXME

processInputs Internal Method

processInputs(fallbackStartsAt: (Int, Int)): Unit

processInputs…​FIXME

Note
processInputs is used exclusively when TungstenAggregationIterator is created (and sets the internal flags to indicate whether to use a hash-based aggregation or, in the worst case, a sort-based aggregation when there is not enough memory for groups and their buffers).

Switching to Sort-Based Aggregation (From Preferred Hash-Based Aggregation) — switchToSortBasedAggregation Internal Method

switchToSortBasedAggregation(): Unit

switchToSortBasedAggregation…​FIXME

Note
switchToSortBasedAggregation is used exclusively when TungstenAggregationIterator is requested to processInputs (and the externalSorter is used).

Getting Next UnsafeRow — next Method

next(): UnsafeRow
Note
next is part of Scala’s scala.collection.Iterator interface that returns the next element and discards it from the iterator.

next…​FIXME

hasNext Method

hasNext: Boolean
Note
hasNext is part of Scala’s scala.collection.Iterator interface that tests whether this iterator can provide another element.

hasNext…​FIXME

Creating TungstenAggregationIterator Instance

TungstenAggregationIterator takes the following when created:

Note
The SQL metrics (numOutputRows, peakMemory, spillSize and avgHashProbe) belong to the HashAggregateExec physical operator that created the TungstenAggregationIterator.

TungstenAggregationIterator initializes the internal registries and counters.

TungstenAggregationIterator starts processing input rows and pre-loads the first key-value pair from the UnsafeFixedWidthAggregationMap if did not switch to sort-based aggregation.

generateResultProjection Method

generateResultProjection(): (UnsafeRow, InternalRow) => UnsafeRow
Note
generateResultProjection is part of the AggregationIterator Contract to…​FIXME.

generateResultProjection…​FIXME

Creating UnsafeRow — outputForEmptyGroupingKeyWithoutInput Method

outputForEmptyGroupingKeyWithoutInput(): UnsafeRow

outputForEmptyGroupingKeyWithoutInput…​FIXME

Note
outputForEmptyGroupingKeyWithoutInput is used when…​FIXME

TaskCompletionListener

TungstenAggregationIterator registers a TaskCompletionListener that is executed on task completion (for every task that processes a partition).

When executed (once per partition), the TaskCompletionListener updates the following metrics:

results matching ""

    No results matching ""