Basic Aggregation — Typed and Untyped Grouping Operators

You can calculate aggregates over a group of rows in a Dataset using aggregate operators (possibly with aggregate functions).

Table 1. Aggregate Operators
Operator Return Type Description

agg

RelationalGroupedDataset

Aggregates with or without grouping (i.e. over an entire Dataset)

groupBy

RelationalGroupedDataset

Used for untyped aggregates using DataFrames. Grouping is described using column expressions or column names.

groupByKey

KeyValueGroupedDataset

Used for typed aggregates using Datasets with records grouped by a key-defining discriminator function.

Note
Aggregate functions without aggregate operators return a single value. If you want to find the aggregate values for each unique value (in a column), you should groupBy first (over this column) to build the groups.
Note

You can also use SparkSession to execute good ol' SQL with GROUP BY should you prefer.

val spark: SparkSession = ???
spark.sql("SELECT COUNT(*) FROM sales GROUP BY city")

SQL or Dataset API’s operators go through the same query planning and optimizations, and have the same performance characteristic in the end.

Aggregates Over Subset Of or Whole Dataset — agg Operator

agg(expr: Column, exprs: Column*): DataFrame
agg(exprs: Map[String, String]): DataFrame
agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame

agg applies an aggregate function on a subset or the entire Dataset (i.e. considering the entire data set as one group).

Note
agg on a Dataset is simply a shortcut for groupBy().agg(…​).
scala> spark.range(10).agg(sum('id) as "sum").show
+---+
|sum|
+---+
| 45|
+---+

agg can compute aggregate expressions on all the records in a Dataset.

Untyped Grouping — groupBy Operator

groupBy(cols: Column*): RelationalGroupedDataset
groupBy(col1: String, cols: String*): RelationalGroupedDataset

groupBy operator groups the rows in a Dataset by columns (as Column expressions or names).

groupBy gives a RelationalGroupedDataset to execute aggregate functions or operators.

// 10^3-record large data set
val ints = 1 to math.pow(10, 3).toInt
val nms = ints.toDF("n").withColumn("m", 'n % 2)
scala> nms.count
res0: Long = 1000

val q = nms.
  groupBy('m).
  agg(sum('n) as "sum").
  orderBy('m)
scala> q.show
+---+------+
|  m|   sum|
+---+------+
|  0|250500|
|  1|250000|
+---+------+

Internally, groupBy resolves column names (possibly quoted) and creates a RelationalGroupedDataset (with groupType being GroupByType).

Note
The following uses the data setup as described in Test Setup section below.
scala> tokens.show
+----+---------+-----+
|name|productId|score|
+----+---------+-----+
| aaa|      100| 0.12|
| aaa|      200| 0.29|
| bbb|      200| 0.53|
| bbb|      300| 0.42|
+----+---------+-----+

scala> tokens.groupBy('name).avg().show
+----+--------------+----------+
|name|avg(productId)|avg(score)|
+----+--------------+----------+
| aaa|         150.0|     0.205|
| bbb|         250.0|     0.475|
+----+--------------+----------+

scala> tokens.groupBy('name, 'productId).agg(Map("score" -> "avg")).show
+----+---------+----------+
|name|productId|avg(score)|
+----+---------+----------+
| aaa|      200|      0.29|
| bbb|      200|      0.53|
| bbb|      300|      0.42|
| aaa|      100|      0.12|
+----+---------+----------+

scala> tokens.groupBy('name).count.show
+----+-----+
|name|count|
+----+-----+
| aaa|    2|
| bbb|    2|
+----+-----+

scala> tokens.groupBy('name).max("score").show
+----+----------+
|name|max(score)|
+----+----------+
| aaa|      0.29|
| bbb|      0.53|
+----+----------+

scala> tokens.groupBy('name).sum("score").show
+----+----------+
|name|sum(score)|
+----+----------+
| aaa|      0.41|
| bbb|      0.95|
+----+----------+

scala> tokens.groupBy('productId).sum("score").show
+---------+------------------+
|productId|        sum(score)|
+---------+------------------+
|      300|              0.42|
|      100|              0.12|
|      200|0.8200000000000001|
+---------+------------------+

Typed Grouping — groupByKey Operator

groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T]

groupByKey groups records (of type T) by the input func and in the end returns a KeyValueGroupedDataset to apply aggregation to.

Note
groupByKey is Dataset's experimental API.
scala> tokens.groupByKey(_.productId).count.orderBy($"value").show
+-----+--------+
|value|count(1)|
+-----+--------+
|  100|       1|
|  200|       2|
|  300|       1|
+-----+--------+

import org.apache.spark.sql.expressions.scalalang._
val q = tokens.
  groupByKey(_.productId).
  agg(typed.sum[Token](_.score)).
  toDF("productId", "sum").
  orderBy('productId)
scala> q.show
+---------+------------------+
|productId|               sum|
+---------+------------------+
|      100|              0.12|
|      200|0.8200000000000001|
|      300|              0.42|
+---------+------------------+

Test Setup

This is a setup for learning GroupedData. Paste it into Spark Shell using :paste.

import spark.implicits._

case class Token(name: String, productId: Int, score: Double)
val data = Seq(
  Token("aaa", 100, 0.12),
  Token("aaa", 200, 0.29),
  Token("bbb", 200, 0.53),
  Token("bbb", 300, 0.42))
val tokens = data.toDS.cache  (1)
  1. Cache the dataset so the following queries won’t load/recompute data over and over again.

results matching ""

    No results matching ""