Aggregator — Contract for User-Defined Typed Aggregate Functions (UDAFs)

Aggregator is the contract for user-defined typed aggregate functions (aka user-defined typed aggregations or UDAFs in short).

package org.apache.spark.sql.expressions

abstract class Aggregator[-IN, BUF, OUT] extends Serializable {
  // only required methods that have no implementation
  def bufferEncoder: Encoder[BUF]
  def finish(reduction: BUF): OUT
  def merge(b1: BUF, b2: BUF): BUF
  def outputEncoder: Encoder[OUT]
  def reduce(b: BUF, a: IN): BUF
  def zero: BUF
}

After you create a custom Aggregator, you should use toColumn method to convert it to a TypedColumn that can be used with Dataset.select and KeyValueGroupedDataset.agg typed operators.

// From Spark MLlib's org.apache.spark.ml.recommendation.ALSModel
// Step 1. Create Aggregator
val topKAggregator: Aggregator[Int, Int, Float] = ???
val recs = ratings
  .as[(Int, Int, Float)]
  .groupByKey(_._1)
  .agg(topKAggregator.toColumn) // <-- use the custom Aggregator
  .toDF("id", "recommendations")
Note

Use org.apache.spark.sql.expressions.scalalang.typed object to access the type-safe aggregate functions, i.e. avg, count, sum and sumLong.

import org.apache.spark.sql.expressions.scalalang.typed

// Example 1
ds.groupByKey(_._1).agg(typed.sum(_._2))

// Example 2
ds.select(typed.sum((i: Int) => i))
Note

Aggregator is an Experimental and Evolving contract that is evolving towards becoming a stable API, but is not a stable API yet and can change from one feature release to another release.

In other words, using the contract is as treading on thin ice.

Aggregator is used when:

Table 1. Aggregator Contract
Method Description

bufferEncoder

Used when…​FIXME

finish

Used when…​FIXME

merge

Used when…​FIXME

outputEncoder

Used when…​FIXME

reduce

Used when…​FIXME

zero

Used when…​FIXME

Table 2. Aggregators
Aggregator Description

ParameterizedTypeSum

ReduceAggregator

TopByKeyAggregator

Used exclusively in Spark MLlib

TypedAverage

TypedCount

TypedSumDouble

TypedSumLong

Converting Aggregator to TypedColumn — toColumn Method

toColumn: TypedColumn[IN, OUT]

toColumn…​FIXME

Note
toColumn is used when…​FIXME

results matching ""

    No results matching ""