Standard Aggregate Functions

Table 1. Standard Aggregate Functions
Name Description

approx_count_distinct

approx_count_distinct(e: Column): Column
approx_count_distinct(columnName: String): Column
approx_count_distinct(e: Column, rsd: Double): Column
approx_count_distinct(columnName: String, rsd: Double): Column

avg

avg(e: Column): Column
avg(columnName: String): Column

collect_list

collect_list(e: Column): Column
collect_list(columnName: String): Column

collect_set

collect_set(e: Column): Column
collect_set(columnName: String): Column

corr

corr(column1: Column, column2: Column): Column
corr(columnName1: String, columnName2: String): Column

count

count(e: Column): Column
count(columnName: String): TypedColumn[Any, Long]

countDistinct

countDistinct(expr: Column, exprs: Column*): Column
countDistinct(columnName: String, columnNames: String*): Column

covar_pop

covar_pop(column1: Column, column2: Column): Column
covar_pop(columnName1: String, columnName2: String): Column

covar_samp

covar_samp(column1: Column, column2: Column): Column
covar_samp(columnName1: String, columnName2: String): Column

first

first(e: Column): Column
first(e: Column, ignoreNulls: Boolean): Column
first(columnName: String): Column
first(columnName: String, ignoreNulls: Boolean): Column

Returns the first value in a group. Returns the first non-null value when ignoreNulls flag on. If all values are null, then returns null.

grouping

grouping(e: Column): Column
grouping(columnName: String): Column

Indicates whether a given column is aggregated or not

grouping_id

grouping_id(cols: Column*): Column
grouping_id(colName: String, colNames: String*): Column

Computes the level of grouping

kurtosis

kurtosis(e: Column): Column
kurtosis(columnName: String): Column

last

last(e: Column, ignoreNulls: Boolean): Column
last(columnName: String, ignoreNulls: Boolean): Column
last(e: Column): Column
last(columnName: String): Column

max

max(e: Column): Column
max(columnName: String): Column

mean

mean(e: Column): Column
mean(columnName: String): Column

min

min(e: Column): Column
min(columnName: String): Column

skewness

skewness(e: Column): Column
skewness(columnName: String): Column

stddev

stddev(e: Column): Column
stddev(columnName: String): Column

stddev_pop

stddev_pop(e: Column): Column
stddev_pop(columnName: String): Column

stddev_samp

stddev_samp(e: Column): Column
stddev_samp(columnName: String): Column

sum

sum(e: Column): Column
sum(columnName: String): Column

sumDistinct

sumDistinct(e: Column): Column
sumDistinct(columnName: String): Column

variance

variance(e: Column): Column
variance(columnName: String): Column

var_pop

var_pop(e: Column): Column
var_pop(columnName: String): Column

var_samp

var_samp(e: Column): Column
var_samp(columnName: String): Column

grouping Aggregate Function

grouping(e: Column): Column
grouping(columnName: String): Column  (1)
  1. Calls the first grouping with columnName as a Column

grouping is an aggregate function that indicates whether a specified column is aggregated or not and:

  • returns 1 if the column is in a subtotal and is NULL

  • returns 0 if the underlying value is NULL or any other value

Note
grouping can only be used with cube, rollup or GROUPING SETS multi-dimensional aggregate operators (and is verified when Analyzer does check analysis).

From Hive’s documentation about Grouping__ID function (that can somehow help to understand grouping):

When aggregates are displayed for a column its value is null. This may conflict in case the column itself has some null values. There needs to be some way to identify NULL in column, which means aggregate and NULL in column, which means value. GROUPING__ID function is the solution to that.

val tmpWorkshops = Seq(
  ("Warsaw", 2016, 2),
  ("Toronto", 2016, 4),
  ("Toronto", 2017, 1)).toDF("city", "year", "count")

// there seems to be a bug with nulls
// and so the need for the following union
val cityNull = Seq(
  (null.asInstanceOf[String], 2016, 2)).toDF("city", "year", "count")

val workshops = tmpWorkshops union cityNull

scala> workshops.show
+-------+----+-----+
|   city|year|count|
+-------+----+-----+
| Warsaw|2016|    2|
|Toronto|2016|    4|
|Toronto|2017|    1|
|   null|2016|    2|
+-------+----+-----+

val q = workshops
  .cube("city", "year")
  .agg(grouping("city"), grouping("year")) // <-- grouping here
  .sort($"city".desc_nulls_last, $"year".desc_nulls_last)

scala> q.show
+-------+----+--------------+--------------+
|   city|year|grouping(city)|grouping(year)|
+-------+----+--------------+--------------+
| Warsaw|2016|             0|             0|
| Warsaw|null|             0|             1|
|Toronto|2017|             0|             0|
|Toronto|2016|             0|             0|
|Toronto|null|             0|             1|
|   null|2017|             1|             0|
|   null|2016|             1|             0|
|   null|2016|             0|             0|  <-- null is city
|   null|null|             0|             1|  <-- null is city
|   null|null|             1|             1|
+-------+----+--------------+--------------+

Internally, grouping creates a Column with Grouping expression.

val q = workshops.cube("city", "year").agg(grouping("city"))
scala> println(q.queryExecution.logical)
'Aggregate [cube(city#182, year#183)], [city#182, year#183, grouping('city) AS grouping(city)#705]
+- Union
   :- Project [_1#178 AS city#182, _2#179 AS year#183, _3#180 AS count#184]
   :  +- LocalRelation [_1#178, _2#179, _3#180]
   +- Project [_1#192 AS city#196, _2#193 AS year#197, _3#194 AS count#198]
      +- LocalRelation [_1#192, _2#193, _3#194]

scala> println(q.queryExecution.analyzed)
Aggregate [city#724, year#725, spark_grouping_id#721], [city#724, year#725, cast((shiftright(spark_grouping_id#721, 1) & 1) as tinyint) AS grouping(city)#720]
+- Expand [List(city#182, year#183, count#184, city#722, year#723, 0), List(city#182, year#183, count#184, city#722, null, 1), List(city#182, year#183, count#184, null, year#723, 2), List(city#182, year#183, count#184, null, null, 3)], [city#182, year#183, count#184, city#724, year#725, spark_grouping_id#721]
   +- Project [city#182, year#183, count#184, city#182 AS city#722, year#183 AS year#723]
      +- Union
         :- Project [_1#178 AS city#182, _2#179 AS year#183, _3#180 AS count#184]
         :  +- LocalRelation [_1#178, _2#179, _3#180]
         +- Project [_1#192 AS city#196, _2#193 AS year#197, _3#194 AS count#198]
            +- LocalRelation [_1#192, _2#193, _3#194]

grouping_id Aggregate Function

grouping_id(cols: Column*): Column
grouping_id(colName: String, colNames: String*): Column (1)
  1. Calls the first grouping_id with colName and colNames as objects of type Column

grouping_id is an aggregate function that computes the level of grouping:

  • 0 for combinations of each column

  • 1 for subtotals of column 1

  • 2 for subtotals of column 2

  • And so on…

val tmpWorkshops = Seq(
  ("Warsaw", 2016, 2),
  ("Toronto", 2016, 4),
  ("Toronto", 2017, 1)).toDF("city", "year", "count")

// there seems to be a bug with nulls
// and so the need for the following union
val cityNull = Seq(
  (null.asInstanceOf[String], 2016, 2)).toDF("city", "year", "count")

val workshops = tmpWorkshops union cityNull

scala> workshops.show
+-------+----+-----+
|   city|year|count|
+-------+----+-----+
| Warsaw|2016|    2|
|Toronto|2016|    4|
|Toronto|2017|    1|
|   null|2016|    2|
+-------+----+-----+

val query = workshops
  .cube("city", "year")
  .agg(grouping_id()) // <-- all grouping columns used
  .sort($"city".desc_nulls_last, $"year".desc_nulls_last)
scala> query.show
+-------+----+-------------+
|   city|year|grouping_id()|
+-------+----+-------------+
| Warsaw|2016|            0|
| Warsaw|null|            1|
|Toronto|2017|            0|
|Toronto|2016|            0|
|Toronto|null|            1|
|   null|2017|            2|
|   null|2016|            2|
|   null|2016|            0|
|   null|null|            1|
|   null|null|            3|
+-------+----+-------------+

scala> spark.catalog.listFunctions.filter(_.name.contains("grouping_id")).show(false)
+-----------+--------+-----------+----------------------------------------------------+-----------+
|name       |database|description|className                                           |isTemporary|
+-----------+--------+-----------+----------------------------------------------------+-----------+
|grouping_id|null    |null       |org.apache.spark.sql.catalyst.expressions.GroupingID|true       |
+-----------+--------+-----------+----------------------------------------------------+-----------+

// bin function gives the string representation of the binary value of the given long column
scala> query.withColumn("bitmask", bin($"grouping_id()")).show
+-------+----+-------------+-------+
|   city|year|grouping_id()|bitmask|
+-------+----+-------------+-------+
| Warsaw|2016|            0|      0|
| Warsaw|null|            1|      1|
|Toronto|2017|            0|      0|
|Toronto|2016|            0|      0|
|Toronto|null|            1|      1|
|   null|2017|            2|     10|
|   null|2016|            2|     10|
|   null|2016|            0|      0|  <-- null is city
|   null|null|            3|     11|
|   null|null|            1|      1|
+-------+----+-------------+-------+

The list of columns of grouping_id should match grouping columns (in cube or rollup) exactly, or empty which means all the grouping columns (which is exactly what the function expects).

Note
grouping_id can only be used with cube, rollup or GROUPING SETS multi-dimensional aggregate operators (and is verified when Analyzer does check analysis).
Note
Spark SQL’s grouping_id function is known as grouping__id in Hive.

When aggregates are displayed for a column its value is null. This may conflict in case the column itself has some null values. There needs to be some way to identify NULL in column, which means aggregate and NULL in column, which means value. GROUPING__ID function is the solution to that.

Internally, grouping_id() creates a Column with GroupingID unevaluable expression.

Note
Unevaluable expressions are expressions replaced by some other expressions during analysis or optimization.
// workshops dataset was defined earlier
val q = workshops
  .cube("city", "year")
  .agg(grouping_id())

// grouping_id function is spark_grouping_id virtual column internally
// that is resolved during analysis - see Analyzed Logical Plan
scala> q.explain(true)
== Parsed Logical Plan ==
'Aggregate [cube(city#182, year#183)], [city#182, year#183, grouping_id() AS grouping_id()#742]
+- Union
   :- Project [_1#178 AS city#182, _2#179 AS year#183, _3#180 AS count#184]
   :  +- LocalRelation [_1#178, _2#179, _3#180]
   +- Project [_1#192 AS city#196, _2#193 AS year#197, _3#194 AS count#198]
      +- LocalRelation [_1#192, _2#193, _3#194]

== Analyzed Logical Plan ==
city: string, year: int, grouping_id(): int
Aggregate [city#757, year#758, spark_grouping_id#754], [city#757, year#758, spark_grouping_id#754 AS grouping_id()#742]
+- Expand [List(city#182, year#183, count#184, city#755, year#756, 0), List(city#182, year#183, count#184, city#755, null, 1), List(city#182, year#183, count#184, null, year#756, 2), List(city#182, year#183, count#184, null, null, 3)], [city#182, year#183, count#184, city#757, year#758, spark_grouping_id#754]
   +- Project [city#182, year#183, count#184, city#182 AS city#755, year#183 AS year#756]
      +- Union
         :- Project [_1#178 AS city#182, _2#179 AS year#183, _3#180 AS count#184]
         :  +- LocalRelation [_1#178, _2#179, _3#180]
         +- Project [_1#192 AS city#196, _2#193 AS year#197, _3#194 AS count#198]
            +- LocalRelation [_1#192, _2#193, _3#194]

== Optimized Logical Plan ==
Aggregate [city#757, year#758, spark_grouping_id#754], [city#757, year#758, spark_grouping_id#754 AS grouping_id()#742]
+- Expand [List(city#755, year#756, 0), List(city#755, null, 1), List(null, year#756, 2), List(null, null, 3)], [city#757, year#758, spark_grouping_id#754]
   +- Union
      :- LocalRelation [city#755, year#756]
      +- LocalRelation [city#755, year#756]

== Physical Plan ==
*HashAggregate(keys=[city#757, year#758, spark_grouping_id#754], functions=[], output=[city#757, year#758, grouping_id()#742])
+- Exchange hashpartitioning(city#757, year#758, spark_grouping_id#754, 200)
   +- *HashAggregate(keys=[city#757, year#758, spark_grouping_id#754], functions=[], output=[city#757, year#758, spark_grouping_id#754])
      +- *Expand [List(city#755, year#756, 0), List(city#755, null, 1), List(null, year#756, 2), List(null, null, 3)], [city#757, year#758, spark_grouping_id#754]
         +- Union
            :- LocalTableScan [city#755, year#756]
            +- LocalTableScan [city#755, year#756]

results matching ""

    No results matching ""