Aggregation — Typed and Untyped Grouping

You can aggregate records in a Dataset by a grouping function (and then calculate aggregates over a collection of grouped records).

Table 1. Aggregate Operators (in alphabetical order)
Operator Return Type Description

agg

RelationalGroupedDatasets

Aggregates with or without grouping (i.e. entire Dataset)

groupBy

RelationalGroupedDatasets

Used for untyped aggregations with DataFrames. Grouping is described using Column-based functions or column names.

groupByKey

KeyValueGroupedDataset

Used for typed aggregations where the data is grouped by a given key function.

grouping

FIXME

Referred as grouping sets FIXME

grouping_id

FIXME

Referred as grouping sets FIXME

grouping Operator

Caution
FIXME

grouping_id Operator

Caution
FIXME

Aggregates Without Grouping (i.e. Entire Dataset) — agg Operator

agg(expr: Column, exprs: Column*): DataFrame
agg(exprs: Map[String, String]): DataFrame
agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame

agg applies an aggregate function on a subset or the entire Dataset (i.e. considering the entire data set as one group).

Note
agg on a Dataset is simply a shortcut for groupBy().agg(…​).
scala> spark.range(10).agg(sum('id) as "sum").show
+---+
|sum|
+---+
| 45|
+---+

agg can compute aggregate expressions on all the records in a Dataset.

Untyped Grouping — groupBy Operators

groupBy(cols: Column*): RelationalGroupedDataset
groupBy(col1: String, cols: String*): RelationalGroupedDataset

groupBy methods group the Dataset using the specified columns as Columns or their text representation. It returns a RelationalGroupedDataset to apply aggregation to.

// 10^3-record large data set
val ints = 1 to math.pow(10, 3).toInt

scala> val dataset = ints.toDF("n").withColumn("m", 'n % 2)
dataset: org.apache.spark.sql.DataFrame = [n: int, m: int]

scala> dataset.count
res0: Long = 1000

scala> dataset.groupBy('m).agg(sum('n)).show
+---+------+
|  m|sum(n)|
+---+------+
|  1|250000|
|  0|250500|
+---+------+

Internally, it first resolves columns and then builds a RelationalGroupedDataset.

Note
The following session uses the data setup as described in Test Setup section below.
scala> dataset.show
+----+---------+-----+
|name|productId|score|
+----+---------+-----+
| aaa|      100| 0.12|
| aaa|      200| 0.29|
| bbb|      200| 0.53|
| bbb|      300| 0.42|
+----+---------+-----+

scala> dataset.groupBy('name).avg().show
+----+--------------+----------+
|name|avg(productId)|avg(score)|
+----+--------------+----------+
| aaa|         150.0|     0.205|
| bbb|         250.0|     0.475|
+----+--------------+----------+

scala> dataset.groupBy('name, 'productId).agg(Map("score" -> "avg")).show
+----+---------+----------+
|name|productId|avg(score)|
+----+---------+----------+
| aaa|      200|      0.29|
| bbb|      200|      0.53|
| bbb|      300|      0.42|
| aaa|      100|      0.12|
+----+---------+----------+

scala> dataset.groupBy('name).count.show
+----+-----+
|name|count|
+----+-----+
| aaa|    2|
| bbb|    2|
+----+-----+

scala> dataset.groupBy('name).max("score").show
+----+----------+
|name|max(score)|
+----+----------+
| aaa|      0.29|
| bbb|      0.53|
+----+----------+

scala> dataset.groupBy('name).sum("score").show
+----+----------+
|name|sum(score)|
+----+----------+
| aaa|      0.41|
| bbb|      0.95|
+----+----------+

scala> dataset.groupBy('productId).sum("score").show
+---------+------------------+
|productId|        sum(score)|
+---------+------------------+
|      300|              0.42|
|      100|              0.12|
|      200|0.8200000000000001|
+---------+------------------+

Typed Grouping — groupByKey Operator

groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T]

groupByKey groups records (of type T) by the input func. It returns a KeyValueGroupedDataset to apply aggregation to.

Note
groupByKey is Dataset's experimental API.
scala> dataset.groupByKey(_.productId).count.show
+-----+--------+
|value|count(1)|
+-----+--------+
|  300|       1|
|  100|       1|
|  200|       2|
+-----+--------+

import org.apache.spark.sql.expressions.scalalang._
scala> dataset.groupByKey(_.productId).agg(typed.sum[Token](_.score)).toDF("productId", "sum").orderBy('productId).show
+---------+------------------+
|productId|               sum|
+---------+------------------+
|      100|              0.12|
|      200|0.8200000000000001|
|      300|              0.42|
+---------+------------------+

RelationalGroupedDataset

RelationalGroupedDataset is a result of executing the untyped operators groupBy, rollup and cube.

RelationalGroupedDataset is also a result of executing pivot operator on a grouped records as RelationalGroupedDataset.

It offers the following operators to work on a grouped collection of records:

  • agg

  • count

  • mean

  • max

  • avg

  • min

  • sum

  • pivot

KeyValueGroupedDataset

KeyValueGroupedDataset is an experimental interface to a result of executing the strongly-typed operator groupByKey.

scala> val tokensByName = dataset.groupByKey(_.name)
tokensByName: org.apache.spark.sql.KeyValueGroupedDataset[String,Token] = org.apache.spark.sql.KeyValueGroupedDataset@1e3aad46

It holds keys that were used for the object.

scala> tokensByName.keys.show
+-----+
|value|
+-----+
|  aaa|
|  bbb|
+-----+

The following methods are available for any KeyValueGroupedDataset to work on groups of records:

  1. agg (of 1 to 4 types)

  2. mapGroups

  3. flatMapGroups

  4. reduceGroups

  5. count that is a special case of agg with count function applied.

  6. cogroup

Test Setup

This is a setup for learning GroupedData. Paste it into Spark Shell using :paste.

import spark.implicits._

case class Token(name: String, productId: Int, score: Double)
val data = Token("aaa", 100, 0.12) ::
  Token("aaa", 200, 0.29) ::
  Token("bbb", 200, 0.53) ::
  Token("bbb", 300, 0.42) :: Nil
val dataset = data.toDS.cache  (1)
  1. Cache the dataset so the following queries won’t load/recompute data over and over again.

results matching ""

    No results matching ""