approx_count_distinct(e: Column): Column
approx_count_distinct(columnName: String): Column
approx_count_distinct(e: Column, rsd: Double): Column
approx_count_distinct(columnName: String, rsd: Double): Column
Standard Aggregate Functions
| Name | Description |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Returns the first value in a group. Returns the first non-null value when |
|
Indicates whether a given column is aggregated or not |
|
Computes the level of grouping |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
grouping Aggregate Function
grouping(e: Column): Column
grouping(columnName: String): Column (1)
-
Calls the first
groupingwithcolumnNameas aColumn
grouping is an aggregate function that indicates whether a specified column is aggregated or not and:
-
returns
1if the column is in a subtotal and isNULL -
returns
0if the underlying value isNULLor any other value
|
Note
|
grouping can only be used with cube, rollup or GROUPING SETS multi-dimensional aggregate operators (and is verified when Analyzer does check analysis).
|
From Hive’s documentation about Grouping__ID function (that can somehow help to understand grouping):
When aggregates are displayed for a column its value is
null. This may conflict in case the column itself has somenullvalues. There needs to be some way to identifyNULLin column, which means aggregate andNULLin column, which means value.GROUPING__IDfunction is the solution to that.
val tmpWorkshops = Seq(
("Warsaw", 2016, 2),
("Toronto", 2016, 4),
("Toronto", 2017, 1)).toDF("city", "year", "count")
// there seems to be a bug with nulls
// and so the need for the following union
val cityNull = Seq(
(null.asInstanceOf[String], 2016, 2)).toDF("city", "year", "count")
val workshops = tmpWorkshops union cityNull
scala> workshops.show
+-------+----+-----+
| city|year|count|
+-------+----+-----+
| Warsaw|2016| 2|
|Toronto|2016| 4|
|Toronto|2017| 1|
| null|2016| 2|
+-------+----+-----+
val q = workshops
.cube("city", "year")
.agg(grouping("city"), grouping("year")) // <-- grouping here
.sort($"city".desc_nulls_last, $"year".desc_nulls_last)
scala> q.show
+-------+----+--------------+--------------+
| city|year|grouping(city)|grouping(year)|
+-------+----+--------------+--------------+
| Warsaw|2016| 0| 0|
| Warsaw|null| 0| 1|
|Toronto|2017| 0| 0|
|Toronto|2016| 0| 0|
|Toronto|null| 0| 1|
| null|2017| 1| 0|
| null|2016| 1| 0|
| null|2016| 0| 0| <-- null is city
| null|null| 0| 1| <-- null is city
| null|null| 1| 1|
+-------+----+--------------+--------------+
Internally, grouping creates a Column with Grouping expression.
val q = workshops.cube("city", "year").agg(grouping("city"))
scala> println(q.queryExecution.logical)
'Aggregate [cube(city#182, year#183)], [city#182, year#183, grouping('city) AS grouping(city)#705]
+- Union
:- Project [_1#178 AS city#182, _2#179 AS year#183, _3#180 AS count#184]
: +- LocalRelation [_1#178, _2#179, _3#180]
+- Project [_1#192 AS city#196, _2#193 AS year#197, _3#194 AS count#198]
+- LocalRelation [_1#192, _2#193, _3#194]
scala> println(q.queryExecution.analyzed)
Aggregate [city#724, year#725, spark_grouping_id#721], [city#724, year#725, cast((shiftright(spark_grouping_id#721, 1) & 1) as tinyint) AS grouping(city)#720]
+- Expand [List(city#182, year#183, count#184, city#722, year#723, 0), List(city#182, year#183, count#184, city#722, null, 1), List(city#182, year#183, count#184, null, year#723, 2), List(city#182, year#183, count#184, null, null, 3)], [city#182, year#183, count#184, city#724, year#725, spark_grouping_id#721]
+- Project [city#182, year#183, count#184, city#182 AS city#722, year#183 AS year#723]
+- Union
:- Project [_1#178 AS city#182, _2#179 AS year#183, _3#180 AS count#184]
: +- LocalRelation [_1#178, _2#179, _3#180]
+- Project [_1#192 AS city#196, _2#193 AS year#197, _3#194 AS count#198]
+- LocalRelation [_1#192, _2#193, _3#194]
|
Note
|
grouping was added to Spark SQL in [SPARK-12706] support grouping/grouping_id function together group set.
|
grouping_id Aggregate Function
grouping_id(cols: Column*): Column
grouping_id(colName: String, colNames: String*): Column (1)
-
Calls the first
grouping_idwithcolNameandcolNamesas objects of typeColumn
grouping_id is an aggregate function that computes the level of grouping:
-
0for combinations of each column -
1for subtotals of column 1 -
2for subtotals of column 2 -
And so on…
val tmpWorkshops = Seq(
("Warsaw", 2016, 2),
("Toronto", 2016, 4),
("Toronto", 2017, 1)).toDF("city", "year", "count")
// there seems to be a bug with nulls
// and so the need for the following union
val cityNull = Seq(
(null.asInstanceOf[String], 2016, 2)).toDF("city", "year", "count")
val workshops = tmpWorkshops union cityNull
scala> workshops.show
+-------+----+-----+
| city|year|count|
+-------+----+-----+
| Warsaw|2016| 2|
|Toronto|2016| 4|
|Toronto|2017| 1|
| null|2016| 2|
+-------+----+-----+
val query = workshops
.cube("city", "year")
.agg(grouping_id()) // <-- all grouping columns used
.sort($"city".desc_nulls_last, $"year".desc_nulls_last)
scala> query.show
+-------+----+-------------+
| city|year|grouping_id()|
+-------+----+-------------+
| Warsaw|2016| 0|
| Warsaw|null| 1|
|Toronto|2017| 0|
|Toronto|2016| 0|
|Toronto|null| 1|
| null|2017| 2|
| null|2016| 2|
| null|2016| 0|
| null|null| 1|
| null|null| 3|
+-------+----+-------------+
scala> spark.catalog.listFunctions.filter(_.name.contains("grouping_id")).show(false)
+-----------+--------+-----------+----------------------------------------------------+-----------+
|name |database|description|className |isTemporary|
+-----------+--------+-----------+----------------------------------------------------+-----------+
|grouping_id|null |null |org.apache.spark.sql.catalyst.expressions.GroupingID|true |
+-----------+--------+-----------+----------------------------------------------------+-----------+
// bin function gives the string representation of the binary value of the given long column
scala> query.withColumn("bitmask", bin($"grouping_id()")).show
+-------+----+-------------+-------+
| city|year|grouping_id()|bitmask|
+-------+----+-------------+-------+
| Warsaw|2016| 0| 0|
| Warsaw|null| 1| 1|
|Toronto|2017| 0| 0|
|Toronto|2016| 0| 0|
|Toronto|null| 1| 1|
| null|2017| 2| 10|
| null|2016| 2| 10|
| null|2016| 0| 0| <-- null is city
| null|null| 3| 11|
| null|null| 1| 1|
+-------+----+-------------+-------+
The list of columns of grouping_id should match grouping columns (in cube or rollup) exactly, or empty which means all the grouping columns (which is exactly what the function expects).
|
Note
|
grouping_id can only be used with cube, rollup or GROUPING SETS multi-dimensional aggregate operators (and is verified when Analyzer does check analysis).
|
|
Note
|
Spark SQL’s grouping_id function is known as grouping__id in Hive.
|
When aggregates are displayed for a column its value is
null. This may conflict in case the column itself has somenullvalues. There needs to be some way to identifyNULLin column, which means aggregate andNULLin column, which means value.GROUPING__IDfunction is the solution to that.
Internally, grouping_id() creates a Column with GroupingID unevaluable expression.
|
Note
|
Unevaluable expressions are expressions replaced by some other expressions during analysis or optimization. |
// workshops dataset was defined earlier
val q = workshops
.cube("city", "year")
.agg(grouping_id())
// grouping_id function is spark_grouping_id virtual column internally
// that is resolved during analysis - see Analyzed Logical Plan
scala> q.explain(true)
== Parsed Logical Plan ==
'Aggregate [cube(city#182, year#183)], [city#182, year#183, grouping_id() AS grouping_id()#742]
+- Union
:- Project [_1#178 AS city#182, _2#179 AS year#183, _3#180 AS count#184]
: +- LocalRelation [_1#178, _2#179, _3#180]
+- Project [_1#192 AS city#196, _2#193 AS year#197, _3#194 AS count#198]
+- LocalRelation [_1#192, _2#193, _3#194]
== Analyzed Logical Plan ==
city: string, year: int, grouping_id(): int
Aggregate [city#757, year#758, spark_grouping_id#754], [city#757, year#758, spark_grouping_id#754 AS grouping_id()#742]
+- Expand [List(city#182, year#183, count#184, city#755, year#756, 0), List(city#182, year#183, count#184, city#755, null, 1), List(city#182, year#183, count#184, null, year#756, 2), List(city#182, year#183, count#184, null, null, 3)], [city#182, year#183, count#184, city#757, year#758, spark_grouping_id#754]
+- Project [city#182, year#183, count#184, city#182 AS city#755, year#183 AS year#756]
+- Union
:- Project [_1#178 AS city#182, _2#179 AS year#183, _3#180 AS count#184]
: +- LocalRelation [_1#178, _2#179, _3#180]
+- Project [_1#192 AS city#196, _2#193 AS year#197, _3#194 AS count#198]
+- LocalRelation [_1#192, _2#193, _3#194]
== Optimized Logical Plan ==
Aggregate [city#757, year#758, spark_grouping_id#754], [city#757, year#758, spark_grouping_id#754 AS grouping_id()#742]
+- Expand [List(city#755, year#756, 0), List(city#755, null, 1), List(null, year#756, 2), List(null, null, 3)], [city#757, year#758, spark_grouping_id#754]
+- Union
:- LocalRelation [city#755, year#756]
+- LocalRelation [city#755, year#756]
== Physical Plan ==
*HashAggregate(keys=[city#757, year#758, spark_grouping_id#754], functions=[], output=[city#757, year#758, grouping_id()#742])
+- Exchange hashpartitioning(city#757, year#758, spark_grouping_id#754, 200)
+- *HashAggregate(keys=[city#757, year#758, spark_grouping_id#754], functions=[], output=[city#757, year#758, spark_grouping_id#754])
+- *Expand [List(city#755, year#756, 0), List(city#755, null, 1), List(null, year#756, 2), List(null, null, 3)], [city#757, year#758, spark_grouping_id#754]
+- Union
:- LocalTableScan [city#755, year#756]
+- LocalTableScan [city#755, year#756]
|
Note
|
grouping_id was added to Spark SQL in [SPARK-12706] support grouping/grouping_id function together group set.
|