approx_count_distinct(e: Column): Column
approx_count_distinct(columnName: String): Column
approx_count_distinct(e: Column, rsd: Double): Column
approx_count_distinct(columnName: String, rsd: Double): Column
Standard Aggregate Functions
Name | Description |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Returns the first value in a group. Returns the first non-null value when |
|
Indicates whether a given column is aggregated or not |
|
Computes the level of grouping |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
grouping
Aggregate Function
grouping(e: Column): Column
grouping(columnName: String): Column (1)
-
Calls the first
grouping
withcolumnName
as aColumn
grouping
is an aggregate function that indicates whether a specified column is aggregated or not and:
-
returns
1
if the column is in a subtotal and isNULL
-
returns
0
if the underlying value isNULL
or any other value
Note
|
grouping can only be used with cube, rollup or GROUPING SETS multi-dimensional aggregate operators (and is verified when Analyzer does check analysis).
|
From Hive’s documentation about Grouping__ID function (that can somehow help to understand grouping
):
When aggregates are displayed for a column its value is
null
. This may conflict in case the column itself has somenull
values. There needs to be some way to identifyNULL
in column, which means aggregate andNULL
in column, which means value.GROUPING__ID
function is the solution to that.
val tmpWorkshops = Seq(
("Warsaw", 2016, 2),
("Toronto", 2016, 4),
("Toronto", 2017, 1)).toDF("city", "year", "count")
// there seems to be a bug with nulls
// and so the need for the following union
val cityNull = Seq(
(null.asInstanceOf[String], 2016, 2)).toDF("city", "year", "count")
val workshops = tmpWorkshops union cityNull
scala> workshops.show
+-------+----+-----+
| city|year|count|
+-------+----+-----+
| Warsaw|2016| 2|
|Toronto|2016| 4|
|Toronto|2017| 1|
| null|2016| 2|
+-------+----+-----+
val q = workshops
.cube("city", "year")
.agg(grouping("city"), grouping("year")) // <-- grouping here
.sort($"city".desc_nulls_last, $"year".desc_nulls_last)
scala> q.show
+-------+----+--------------+--------------+
| city|year|grouping(city)|grouping(year)|
+-------+----+--------------+--------------+
| Warsaw|2016| 0| 0|
| Warsaw|null| 0| 1|
|Toronto|2017| 0| 0|
|Toronto|2016| 0| 0|
|Toronto|null| 0| 1|
| null|2017| 1| 0|
| null|2016| 1| 0|
| null|2016| 0| 0| <-- null is city
| null|null| 0| 1| <-- null is city
| null|null| 1| 1|
+-------+----+--------------+--------------+
Internally, grouping
creates a Column with Grouping
expression.
val q = workshops.cube("city", "year").agg(grouping("city"))
scala> println(q.queryExecution.logical)
'Aggregate [cube(city#182, year#183)], [city#182, year#183, grouping('city) AS grouping(city)#705]
+- Union
:- Project [_1#178 AS city#182, _2#179 AS year#183, _3#180 AS count#184]
: +- LocalRelation [_1#178, _2#179, _3#180]
+- Project [_1#192 AS city#196, _2#193 AS year#197, _3#194 AS count#198]
+- LocalRelation [_1#192, _2#193, _3#194]
scala> println(q.queryExecution.analyzed)
Aggregate [city#724, year#725, spark_grouping_id#721], [city#724, year#725, cast((shiftright(spark_grouping_id#721, 1) & 1) as tinyint) AS grouping(city)#720]
+- Expand [List(city#182, year#183, count#184, city#722, year#723, 0), List(city#182, year#183, count#184, city#722, null, 1), List(city#182, year#183, count#184, null, year#723, 2), List(city#182, year#183, count#184, null, null, 3)], [city#182, year#183, count#184, city#724, year#725, spark_grouping_id#721]
+- Project [city#182, year#183, count#184, city#182 AS city#722, year#183 AS year#723]
+- Union
:- Project [_1#178 AS city#182, _2#179 AS year#183, _3#180 AS count#184]
: +- LocalRelation [_1#178, _2#179, _3#180]
+- Project [_1#192 AS city#196, _2#193 AS year#197, _3#194 AS count#198]
+- LocalRelation [_1#192, _2#193, _3#194]
Note
|
grouping was added to Spark SQL in [SPARK-12706] support grouping/grouping_id function together group set.
|
grouping_id
Aggregate Function
grouping_id(cols: Column*): Column
grouping_id(colName: String, colNames: String*): Column (1)
-
Calls the first
grouping_id
withcolName
andcolNames
as objects of typeColumn
grouping_id
is an aggregate function that computes the level of grouping:
-
0
for combinations of each column -
1
for subtotals of column 1 -
2
for subtotals of column 2 -
And so on…
val tmpWorkshops = Seq(
("Warsaw", 2016, 2),
("Toronto", 2016, 4),
("Toronto", 2017, 1)).toDF("city", "year", "count")
// there seems to be a bug with nulls
// and so the need for the following union
val cityNull = Seq(
(null.asInstanceOf[String], 2016, 2)).toDF("city", "year", "count")
val workshops = tmpWorkshops union cityNull
scala> workshops.show
+-------+----+-----+
| city|year|count|
+-------+----+-----+
| Warsaw|2016| 2|
|Toronto|2016| 4|
|Toronto|2017| 1|
| null|2016| 2|
+-------+----+-----+
val query = workshops
.cube("city", "year")
.agg(grouping_id()) // <-- all grouping columns used
.sort($"city".desc_nulls_last, $"year".desc_nulls_last)
scala> query.show
+-------+----+-------------+
| city|year|grouping_id()|
+-------+----+-------------+
| Warsaw|2016| 0|
| Warsaw|null| 1|
|Toronto|2017| 0|
|Toronto|2016| 0|
|Toronto|null| 1|
| null|2017| 2|
| null|2016| 2|
| null|2016| 0|
| null|null| 1|
| null|null| 3|
+-------+----+-------------+
scala> spark.catalog.listFunctions.filter(_.name.contains("grouping_id")).show(false)
+-----------+--------+-----------+----------------------------------------------------+-----------+
|name |database|description|className |isTemporary|
+-----------+--------+-----------+----------------------------------------------------+-----------+
|grouping_id|null |null |org.apache.spark.sql.catalyst.expressions.GroupingID|true |
+-----------+--------+-----------+----------------------------------------------------+-----------+
// bin function gives the string representation of the binary value of the given long column
scala> query.withColumn("bitmask", bin($"grouping_id()")).show
+-------+----+-------------+-------+
| city|year|grouping_id()|bitmask|
+-------+----+-------------+-------+
| Warsaw|2016| 0| 0|
| Warsaw|null| 1| 1|
|Toronto|2017| 0| 0|
|Toronto|2016| 0| 0|
|Toronto|null| 1| 1|
| null|2017| 2| 10|
| null|2016| 2| 10|
| null|2016| 0| 0| <-- null is city
| null|null| 3| 11|
| null|null| 1| 1|
+-------+----+-------------+-------+
The list of columns of grouping_id
should match grouping columns (in cube
or rollup
) exactly, or empty which means all the grouping columns (which is exactly what the function expects).
Note
|
grouping_id can only be used with cube, rollup or GROUPING SETS multi-dimensional aggregate operators (and is verified when Analyzer does check analysis).
|
Note
|
Spark SQL’s grouping_id function is known as grouping__id in Hive.
|
When aggregates are displayed for a column its value is
null
. This may conflict in case the column itself has somenull
values. There needs to be some way to identifyNULL
in column, which means aggregate andNULL
in column, which means value.GROUPING__ID
function is the solution to that.
Internally, grouping_id()
creates a Column with GroupingID
unevaluable expression.
Note
|
Unevaluable expressions are expressions replaced by some other expressions during analysis or optimization. |
// workshops dataset was defined earlier
val q = workshops
.cube("city", "year")
.agg(grouping_id())
// grouping_id function is spark_grouping_id virtual column internally
// that is resolved during analysis - see Analyzed Logical Plan
scala> q.explain(true)
== Parsed Logical Plan ==
'Aggregate [cube(city#182, year#183)], [city#182, year#183, grouping_id() AS grouping_id()#742]
+- Union
:- Project [_1#178 AS city#182, _2#179 AS year#183, _3#180 AS count#184]
: +- LocalRelation [_1#178, _2#179, _3#180]
+- Project [_1#192 AS city#196, _2#193 AS year#197, _3#194 AS count#198]
+- LocalRelation [_1#192, _2#193, _3#194]
== Analyzed Logical Plan ==
city: string, year: int, grouping_id(): int
Aggregate [city#757, year#758, spark_grouping_id#754], [city#757, year#758, spark_grouping_id#754 AS grouping_id()#742]
+- Expand [List(city#182, year#183, count#184, city#755, year#756, 0), List(city#182, year#183, count#184, city#755, null, 1), List(city#182, year#183, count#184, null, year#756, 2), List(city#182, year#183, count#184, null, null, 3)], [city#182, year#183, count#184, city#757, year#758, spark_grouping_id#754]
+- Project [city#182, year#183, count#184, city#182 AS city#755, year#183 AS year#756]
+- Union
:- Project [_1#178 AS city#182, _2#179 AS year#183, _3#180 AS count#184]
: +- LocalRelation [_1#178, _2#179, _3#180]
+- Project [_1#192 AS city#196, _2#193 AS year#197, _3#194 AS count#198]
+- LocalRelation [_1#192, _2#193, _3#194]
== Optimized Logical Plan ==
Aggregate [city#757, year#758, spark_grouping_id#754], [city#757, year#758, spark_grouping_id#754 AS grouping_id()#742]
+- Expand [List(city#755, year#756, 0), List(city#755, null, 1), List(null, year#756, 2), List(null, null, 3)], [city#757, year#758, spark_grouping_id#754]
+- Union
:- LocalRelation [city#755, year#756]
+- LocalRelation [city#755, year#756]
== Physical Plan ==
*HashAggregate(keys=[city#757, year#758, spark_grouping_id#754], functions=[], output=[city#757, year#758, grouping_id()#742])
+- Exchange hashpartitioning(city#757, year#758, spark_grouping_id#754, 200)
+- *HashAggregate(keys=[city#757, year#758, spark_grouping_id#754], functions=[], output=[city#757, year#758, spark_grouping_id#754])
+- *Expand [List(city#755, year#756, 0), List(city#755, null, 1), List(null, year#756, 2), List(null, null, 3)], [city#757, year#758, spark_grouping_id#754]
+- Union
:- LocalTableScan [city#755, year#756]
+- LocalTableScan [city#755, year#756]
Note
|
grouping_id was added to Spark SQL in [SPARK-12706] support grouping/grouping_id function together group set.
|