Window Aggregate Operators — Windows

Window Aggregate Operators (aka Windows) are operators that perform a calculation on a group of records (called a window) that are in some relation to the current record. They calculate values for every records in a window.

Note
Window-based framework is available as an experimental feature since Spark 1.4.0.

Unlike the regular aggregation operators, window aggregates does not group records as a single record but rather work across the rows that fall into the same partition as the current row.

Window aggregates are supported in SQL queries, Column-based expressions and Scala API.

//
// Borrowed from 3.5. Window Functions in PostgreSQL documentation
// Example of window operators using Scala API
//
case class Salary(depName: String, empNo: Long, salary: Long)
val empsalary = Seq(
  Salary("sales", 1, 5000),
  Salary("personnel", 2, 3900),
  Salary("sales", 3, 4800),
  Salary("sales", 4, 4800),
  Salary("personnel", 5, 3500),
  Salary("develop", 7, 4200),
  Salary("develop", 8, 6000),
  Salary("develop", 9, 4500),
  Salary("develop", 10, 5200),
  Salary("develop", 11, 5200)).toDS

import org.apache.spark.sql.expressions.Window
// Windows are partitions of deptName
scala> val byDepName = Window.partitionBy('depName)
byDepName: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@1a711314

scala> empsalary.withColumn("avg", avg('salary) over byDepName).show
+---------+-----+------+-----------------+
|  depName|empNo|salary|              avg|
+---------+-----+------+-----------------+
|  develop|    7|  4200|           5020.0|
|  develop|    8|  6000|           5020.0|
|  develop|    9|  4500|           5020.0|
|  develop|   10|  5200|           5020.0|
|  develop|   11|  5200|           5020.0|
|    sales|    1|  5000|4866.666666666667|
|    sales|    3|  4800|4866.666666666667|
|    sales|    4|  4800|4866.666666666667|
|personnel|    2|  3900|           3700.0|
|personnel|    5|  3500|           3700.0|
+---------+-----+------+-----------------+

As is shown in the example above, you describe a window using the convenient factory methods in Window object that create a window specification that you can further refine with partitioning, ordering, and frame boundaries.

After you describe a window you can apply window aggregate functions like ranking functions (e.g. RANK), analytic functions (e.g. LAG), and the regular aggregate functions, e.g. SUM, AVG, MAX.

Window Aggregate Functions

A window aggregate function calculates a return value over a set of rows called window that are somehow related to the current row.

Note
Window functions are also called over functions due to how they are applied using Column’s over function.

Although similar to aggregate functions, a window function does not group rows into a single output row and retains their separate identities. A window function can access rows that are linked to the current row.

Tip
See Examples section in this document.

Spark SQL supports three kinds of window functions:

  • ranking functions

  • analytic functions

  • aggregate functions

Table 1. Window functions in Spark SQL (see Introducing Window Functions in Spark SQL)
SQL Function

Ranking functions

RANK

rank

DENSE_RANK

dense_rank

PERCENT_RANK

percent_rank

NTILE

ntile

ROW_NUMBER

row_number

Analytic functions

CUME_DIST

cume_dist

LAG

lag

LEAD

lead

For aggregate functions, you can use the existing aggregate functions as window functions, e.g. sum, avg, min, max and count.

You can mark a function window by OVER clause after a function in SQL, e.g. avg(revenue) OVER (…​) or over method on a function in the Dataset API, e.g. rank().over(…​).

When executed, a window function computes a value for each row in a window.

Note
Window functions belong to Window functions group in Spark’s Scala API.

WindowSpec — Window Specification

A window function needs a window specification which is an instance of WindowSpec class.

Note
WindowSpec class is marked as experimental since 1.4.0.

A window specification defines which rows are included in a window (aka a frame), i.e. set of rows, that is associated with a given input row. It does so by partitioning an entire data set and specifying frame boundary with ordering.

Note
Use static methods in Window object to create a WindowSpec.
import org.apache.spark.sql.expressions.Window

scala> val byHTokens = Window.partitionBy('token startsWith "h")
byHTokens: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@574985d8

A window specification includes three parts:

  1. Partitioning Specification defines which records are in the same partition. With no partition defined, all records belong to a single partition.

  2. Ordering Specification defines how records in a partition are ordered that in turn defines the position of a record in a partition. The ordering could be ascending (ASC in SQL or asc in Scala) or descending (DESC or desc).

  3. Frame Specification (unsupported in Hive; see Why do Window functions fail with "Window function X does not take a frame specification"?) defines the records to be included in the frame for the current input row, based on their relative position to the current row. For example, “the three rows preceding the current row to the current row” describes a frame including the current input row and three rows appearing before the current row.

Once WindowSpec instance has been created using Window object, you can further expand on window specification using the following methods to define frames:

  • rowsBetween(start: Long, end: Long): WindowSpec

  • rangeBetween(start: Long, end: Long): WindowSpec

Besides the two above, you can also use the following methods (that correspond to the methods in Window object):

  • partitionBy

  • orderBy

Window object

Window object provides functions to define windows (as WindowSpec instances).

Window object lives in org.apache.spark.sql.expressions package. Import it to use Window functions.

import org.apache.spark.sql.expressions.Window

There are two families of the functions available in Window object that create WindowSpec instance for one or many Column instances:

Partitioning Records — partitionBy Methods

partitionBy(colName: String, colNames: String*): WindowSpec
partitionBy(cols: Column*): WindowSpec

partitionBy creates an instance of WindowSpec with partition expression(s) defined for one or more columns.

// partition records into two groups
// * tokens starting with "h"
// * others
val byHTokens = Window.partitionBy('token startsWith "h")

// count the sum of ids in each group
val result = tokens.select('*, sum('id) over byHTokens as "sum over h tokens").orderBy('id)

scala> .show
+---+-----+-----------------+
| id|token|sum over h tokens|
+---+-----+-----------------+
|  0|hello|                4|
|  1|henry|                4|
|  2|  and|                2|
|  3|harry|                4|
+---+-----+-----------------+

Ordering in Windows — orderBy Methods

orderBy(colName: String, colNames: String*): WindowSpec
orderBy(cols: Column*): WindowSpec

orderBy allows you to control the order of records in a window.

import org.apache.spark.sql.expressions.Window
val byDepnameSalaryDesc = Window.partitionBy('depname).orderBy('salary desc)

// a numerical rank within the current row's partition for each distinct ORDER BY value
scala> val rankByDepname = rank().over(byDepnameSalaryDesc)
rankByDepname: org.apache.spark.sql.Column = RANK() OVER (PARTITION BY depname ORDER BY salary DESC UnspecifiedFrame)

scala> empsalary.select('*, rankByDepname as 'rank).show
+---------+-----+------+----+
|  depName|empNo|salary|rank|
+---------+-----+------+----+
|  develop|    8|  6000|   1|
|  develop|   10|  5200|   2|
|  develop|   11|  5200|   2|
|  develop|    9|  4500|   4|
|  develop|    7|  4200|   5|
|    sales|    1|  5000|   1|
|    sales|    3|  4800|   2|
|    sales|    4|  4800|   2|
|personnel|    2|  3900|   1|
|personnel|    5|  3500|   2|
+---------+-----+------+----+

Window Examples

Two samples from org.apache.spark.sql.expressions.Window scaladoc:

// PARTITION BY country ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
Window.partitionBy('country).orderBy('date).rowsBetween(Long.MinValue, 0)
// PARTITION BY country ORDER BY date ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING
Window.partitionBy('country).orderBy('date).rowsBetween(-3, 3)

Frame

At its core, a window function calculates a return value for every input row of a table based on a group of rows, called the frame. Every input row can have a unique frame associated with it.

When you define a frame you have to specify three components of a frame specification - the start and end boundaries, and the type.

Types of boundaries (two positions and three offsets):

  • UNBOUNDED PRECEDING - the first row of the partition

  • UNBOUNDED FOLLOWING - the last row of the partition

  • CURRENT ROW

  • <value> PRECEDING

  • <value> FOLLOWING

Offsets specify the offset from the current input row.

Types of frames:

  • ROW - based on physical offsets from the position of the current input row

  • RANGE - based on logical offsets from the position of the current input row

In the current implementation of WindowSpec you can use two methods to define a frame:

  • rowsBetween

  • rangeBetween

See WindowSpec for their coverage.

Window Operators in SQL Queries

The grammar of windows operators in SQL accepts the following:

  1. CLUSTER BY or PARTITION BY or DISTRIBUTE BY for partitions,

  2. ORDER BY or SORT BY for sorting order,

  3. RANGE, ROWS, RANGE BETWEEN, and ROWS BETWEEN for window frame types,

  4. UNBOUNDED PRECEDING, UNBOUNDED FOLLOWING, CURRENT ROW for frame bounds.

Examples

Top N per Group

Top N per Group is useful when you need to compute the first and second best-sellers in category.

Note
This example is borrowed from an excellent article Introducing Window Functions in Spark SQL.
Table 2. Table PRODUCT_REVENUE
product category revenue

Thin

cell phone

6000

Normal

tablet

1500

Mini

tablet

5500

Ultra thin

cell phone

5000

Very thin

cell phone

6000

Big

tablet

2500

Bendable

cell phone

3000

Foldable

cell phone

3000

Pro

tablet

4500

Pro2

tablet

6500

Question: What are the best-selling and the second best-selling products in every category?

val dataset = Seq(
  ("Thin",       "cell phone", 6000),
  ("Normal",     "tablet",     1500),
  ("Mini",       "tablet",     5500),
  ("Ultra thin", "cell phone", 5000),
  ("Very thin",  "cell phone", 6000),
  ("Big",        "tablet",     2500),
  ("Bendable",   "cell phone", 3000),
  ("Foldable",   "cell phone", 3000),
  ("Pro",        "tablet",     4500),
  ("Pro2",       "tablet",     6500))
  .toDF("product", "category", "revenue")

scala> dataset.show
+----------+----------+-------+
|   product|  category|revenue|
+----------+----------+-------+
|      Thin|cell phone|   6000|
|    Normal|    tablet|   1500|
|      Mini|    tablet|   5500|
|Ultra thin|cell phone|   5000|
| Very thin|cell phone|   6000|
|       Big|    tablet|   2500|
|  Bendable|cell phone|   3000|
|  Foldable|cell phone|   3000|
|       Pro|    tablet|   4500|
|      Pro2|    tablet|   6500|
+----------+----------+-------+

scala> data.where('category === "tablet").show
+-------+--------+-------+
|product|category|revenue|
+-------+--------+-------+
| Normal|  tablet|   1500|
|   Mini|  tablet|   5500|
|    Big|  tablet|   2500|
|    Pro|  tablet|   4500|
|   Pro2|  tablet|   6500|
+-------+--------+-------+

The question boils down to ranking products in a category based on their revenue, and to pick the best selling and the second best-selling products based the ranking.

import org.apache.spark.sql.expressions.Window
val overCategory = Window.partitionBy('category).orderBy('revenue.desc)

val ranked = data.withColumn("rank", dense_rank.over(overCategory))

scala> ranked.show
+----------+----------+-------+----+
|   product|  category|revenue|rank|
+----------+----------+-------+----+
|      Pro2|    tablet|   6500|   1|
|      Mini|    tablet|   5500|   2|
|       Pro|    tablet|   4500|   3|
|       Big|    tablet|   2500|   4|
|    Normal|    tablet|   1500|   5|
|      Thin|cell phone|   6000|   1|
| Very thin|cell phone|   6000|   1|
|Ultra thin|cell phone|   5000|   2|
|  Bendable|cell phone|   3000|   3|
|  Foldable|cell phone|   3000|   3|
+----------+----------+-------+----+

scala> ranked.where('rank <= 2).show
+----------+----------+-------+----+
|   product|  category|revenue|rank|
+----------+----------+-------+----+
|      Pro2|    tablet|   6500|   1|
|      Mini|    tablet|   5500|   2|
|      Thin|cell phone|   6000|   1|
| Very thin|cell phone|   6000|   1|
|Ultra thin|cell phone|   5000|   2|
+----------+----------+-------+----+

Revenue Difference per Category

Note
This example is the 2nd example from an excellent article Introducing Window Functions in Spark SQL.
import org.apache.spark.sql.expressions.Window
val reveDesc = Window.partitionBy('category).orderBy('revenue.desc)
val reveDiff = max('revenue).over(reveDesc) - 'revenue

scala> data.select('*, reveDiff as 'revenue_diff).show
+----------+----------+-------+------------+
|   product|  category|revenue|revenue_diff|
+----------+----------+-------+------------+
|      Pro2|    tablet|   6500|           0|
|      Mini|    tablet|   5500|        1000|
|       Pro|    tablet|   4500|        2000|
|       Big|    tablet|   2500|        4000|
|    Normal|    tablet|   1500|        5000|
|      Thin|cell phone|   6000|           0|
| Very thin|cell phone|   6000|           0|
|Ultra thin|cell phone|   5000|        1000|
|  Bendable|cell phone|   3000|        3000|
|  Foldable|cell phone|   3000|        3000|
+----------+----------+-------+------------+

Difference on Column

Compute a difference between values in rows in a column.

val pairs = for {
  x <- 1 to 5
  y <- 1 to 2
} yield (x, 10 * x * y)
val ds = pairs.toDF("ns", "tens")

scala> ds.show
+---+----+
| ns|tens|
+---+----+
|  1|  10|
|  1|  20|
|  2|  20|
|  2|  40|
|  3|  30|
|  3|  60|
|  4|  40|
|  4|  80|
|  5|  50|
|  5| 100|
+---+----+

import org.apache.spark.sql.expressions.Window
val overNs = Window.partitionBy('ns).orderBy('tens)
val diff = lead('tens, 1).over(overNs)

scala> ds.withColumn("diff", diff - 'tens).show
+---+----+----+
| ns|tens|diff|
+---+----+----+
|  1|  10|  10|
|  1|  20|null|
|  3|  30|  30|
|  3|  60|null|
|  5|  50|  50|
|  5| 100|null|
|  4|  40|  40|
|  4|  80|null|
|  2|  20|  20|
|  2|  40|null|
+---+----+----+

The key here is to remember that DataFrames are RDDs under the covers and hence aggregation like grouping by a key in DataFrames is RDD’s groupBy (or worse, reduceByKey or aggregateByKey transformations).

Running Total

The running total is the sum of all previous lines including the current one.

val sales = Seq(
  (0, 0, 0, 5),
  (1, 0, 1, 3),
  (2, 0, 2, 1),
  (3, 1, 0, 2),
  (4, 2, 0, 8),
  (5, 2, 2, 8))
  .toDF("id", "orderID", "prodID", "orderQty")

scala> sales.show
+---+-------+------+--------+
| id|orderID|prodID|orderQty|
+---+-------+------+--------+
|  0|      0|     0|       5|
|  1|      0|     1|       3|
|  2|      0|     2|       1|
|  3|      1|     0|       2|
|  4|      2|     0|       8|
|  5|      2|     2|       8|
+---+-------+------+--------+

val orderedByID = Window.orderBy('id)

val totalQty = sum('orderQty).over(orderedByID).as('running_total)
val salesTotalQty = sales.select('*, totalQty).orderBy('id)

scala> salesTotalQty.show
16/04/10 23:01:52 WARN Window: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+---+-------+------+--------+-------------+
| id|orderID|prodID|orderQty|running_total|
+---+-------+------+--------+-------------+
|  0|      0|     0|       5|            5|
|  1|      0|     1|       3|            8|
|  2|      0|     2|       1|            9|
|  3|      1|     0|       2|           11|
|  4|      2|     0|       8|           19|
|  5|      2|     2|       8|           27|
+---+-------+------+--------+-------------+

val byOrderId = orderedByID.partitionBy('orderID)
val totalQtyPerOrder = sum('orderQty).over(byOrderId).as('running_total_per_order)
val salesTotalQtyPerOrder = sales.select('*, totalQtyPerOrder).orderBy('id)

scala> salesTotalQtyPerOrder.show
+---+-------+------+--------+-----------------------+
| id|orderID|prodID|orderQty|running_total_per_order|
+---+-------+------+--------+-----------------------+
|  0|      0|     0|       5|                      5|
|  1|      0|     1|       3|                      8|
|  2|      0|     2|       1|                      9|
|  3|      1|     0|       2|                      2|
|  4|      2|     0|       8|                      8|
|  5|      2|     2|       8|                     16|
+---+-------+------+--------+-----------------------+

Calculate rank of row

See "Explaining" Query Plans of Windows for an elaborate example.

Interval data type for Date and Timestamp types

With the Interval data type, you could use intervals as values specified in <value> PRECEDING and <value> FOLLOWING for RANGE frame. It is specifically suited for time-series analysis with window functions.

Accessing values of earlier rows

FIXME What’s the value of rows before current one?

Moving Average

Cumulative Aggregates

Eg. cumulative sum

User-defined aggregate functions

With the window function support, you could use user-defined aggregate functions as window functions.

"Explaining" Query Plans of Windows

import org.apache.spark.sql.expressions.Window
val byDepnameSalaryDesc = Window.partitionBy('depname).orderBy('salary desc)

scala> val rankByDepname = rank().over(byDepnameSalaryDesc)
rankByDepname: org.apache.spark.sql.Column = RANK() OVER (PARTITION BY depname ORDER BY salary DESC UnspecifiedFrame)

// empsalary defined at the top of the page
scala> empsalary.select('*, rankByDepname as 'rank).explain(extended = true)
== Parsed Logical Plan ==
'Project [*, rank() windowspecdefinition('depname, 'salary DESC, UnspecifiedFrame) AS rank#9]
+- LocalRelation [depName#5, empNo#6L, salary#7L]

== Analyzed Logical Plan ==
depName: string, empNo: bigint, salary: bigint, rank: int
Project [depName#5, empNo#6L, salary#7L, rank#9]
+- Project [depName#5, empNo#6L, salary#7L, rank#9, rank#9]
   +- Window [rank(salary#7L) windowspecdefinition(depname#5, salary#7L DESC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#9], [depname#5], [salary#7L DESC]
      +- Project [depName#5, empNo#6L, salary#7L]
         +- LocalRelation [depName#5, empNo#6L, salary#7L]

== Optimized Logical Plan ==
Window [rank(salary#7L) windowspecdefinition(depname#5, salary#7L DESC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#9], [depname#5], [salary#7L DESC]
+- LocalRelation [depName#5, empNo#6L, salary#7L]

== Physical Plan ==
Window [rank(salary#7L) windowspecdefinition(depname#5, salary#7L DESC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#9], [depname#5], [salary#7L DESC]
+- *Sort [depname#5 ASC, salary#7L DESC], false, 0
   +- Exchange hashpartitioning(depname#5, 200)
      +- LocalTableScan [depName#5, empNo#6L, salary#7L]

results matching ""

    No results matching ""