val spark: SparkSession = ???
spark.sql("SELECT COUNT(*) FROM sales GROUP BY city")
Basic Aggregation — Typed and Untyped Grouping Operators
You can calculate aggregates over a group of rows in a Dataset using aggregate operators (possibly with aggregate functions).
Operator | Return Type | Description |
---|---|---|
Aggregates with or without grouping (i.e. over an entire Dataset) |
||
Used for untyped aggregates using DataFrames. Grouping is described using column expressions or column names. |
||
Used for typed aggregates using Datasets with records grouped by a key-defining discriminator function. |
Note
|
Aggregate functions without aggregate operators return a single value. If you want to find the aggregate values for each unique value (in a column), you should groupBy first (over this column) to build the groups. |
Note
|
You can also use SparkSession to execute good ol' SQL with SQL or Dataset API’s operators go through the same query planning and optimizations, and have the same performance characteristic in the end. |
Aggregates Over Subset Of or Whole Dataset — agg
Operator
agg(expr: Column, exprs: Column*): DataFrame
agg(exprs: Map[String, String]): DataFrame
agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame
agg applies an aggregate function on a subset or the entire Dataset
(i.e. considering the entire data set as one group).
Note
|
agg on a Dataset is simply a shortcut for groupBy().agg(…).
|
scala> spark.range(10).agg(sum('id) as "sum").show
+---+
|sum|
+---+
| 45|
+---+
agg
can compute aggregate expressions on all the records in a Dataset
.
Untyped Grouping — groupBy
Operator
groupBy(cols: Column*): RelationalGroupedDataset
groupBy(col1: String, cols: String*): RelationalGroupedDataset
groupBy
operator groups the rows in a Dataset
by columns (as Column expressions or names).
groupBy
gives a RelationalGroupedDataset to execute aggregate functions or operators.
// 10^3-record large data set
val ints = 1 to math.pow(10, 3).toInt
val nms = ints.toDF("n").withColumn("m", 'n % 2)
scala> nms.count
res0: Long = 1000
val q = nms.
groupBy('m).
agg(sum('n) as "sum").
orderBy('m)
scala> q.show
+---+------+
| m| sum|
+---+------+
| 0|250500|
| 1|250000|
+---+------+
Internally, groupBy
resolves column names (possibly quoted) and creates a RelationalGroupedDataset
(with groupType being GroupByType
).
Note
|
The following uses the data setup as described in Test Setup section below. |
scala> tokens.show
+----+---------+-----+
|name|productId|score|
+----+---------+-----+
| aaa| 100| 0.12|
| aaa| 200| 0.29|
| bbb| 200| 0.53|
| bbb| 300| 0.42|
+----+---------+-----+
scala> tokens.groupBy('name).avg().show
+----+--------------+----------+
|name|avg(productId)|avg(score)|
+----+--------------+----------+
| aaa| 150.0| 0.205|
| bbb| 250.0| 0.475|
+----+--------------+----------+
scala> tokens.groupBy('name, 'productId).agg(Map("score" -> "avg")).show
+----+---------+----------+
|name|productId|avg(score)|
+----+---------+----------+
| aaa| 200| 0.29|
| bbb| 200| 0.53|
| bbb| 300| 0.42|
| aaa| 100| 0.12|
+----+---------+----------+
scala> tokens.groupBy('name).count.show
+----+-----+
|name|count|
+----+-----+
| aaa| 2|
| bbb| 2|
+----+-----+
scala> tokens.groupBy('name).max("score").show
+----+----------+
|name|max(score)|
+----+----------+
| aaa| 0.29|
| bbb| 0.53|
+----+----------+
scala> tokens.groupBy('name).sum("score").show
+----+----------+
|name|sum(score)|
+----+----------+
| aaa| 0.41|
| bbb| 0.95|
+----+----------+
scala> tokens.groupBy('productId).sum("score").show
+---------+------------------+
|productId| sum(score)|
+---------+------------------+
| 300| 0.42|
| 100| 0.12|
| 200|0.8200000000000001|
+---------+------------------+
Typed Grouping — groupByKey
Operator
groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T]
groupByKey
groups records (of type T
) by the input func
and in the end returns a KeyValueGroupedDataset to apply aggregation to.
Note
|
groupByKey is Dataset 's experimental API.
|
scala> tokens.groupByKey(_.productId).count.orderBy($"value").show
+-----+--------+
|value|count(1)|
+-----+--------+
| 100| 1|
| 200| 2|
| 300| 1|
+-----+--------+
import org.apache.spark.sql.expressions.scalalang._
val q = tokens.
groupByKey(_.productId).
agg(typed.sum[Token](_.score)).
toDF("productId", "sum").
orderBy('productId)
scala> q.show
+---------+------------------+
|productId| sum|
+---------+------------------+
| 100| 0.12|
| 200|0.8200000000000001|
| 300| 0.42|
+---------+------------------+
Test Setup
This is a setup for learning GroupedData
. Paste it into Spark Shell using :paste
.
import spark.implicits._
case class Token(name: String, productId: Int, score: Double)
val data = Seq(
Token("aaa", 100, 0.12),
Token("aaa", 200, 0.29),
Token("bbb", 200, 0.53),
Token("bbb", 300, 0.42))
val tokens = data.toDS.cache (1)
-
Cache the dataset so the following queries won’t load/recompute data over and over again.