Standard Functions — functions Object

org.apache.spark.sql.functions object offers many built-in functions to process values in Columns in Datasets.

Note
The functions object is an experimental feature of Spark since version 1.3.0.

You can access the functions using the following import statement:

import org.apache.spark.sql.functions._

There are over 300 functions in the functions object. Some functions are transformations of Column objects (or column names) into other Column objects or transform DataFrame into DataFrame.

The functions are grouped by functional areas:

Table 1. Functions in Spark SQL
Functions Description

Window functions

rank, dense_rank, and percent_rank

Ranking records per window partition

ntile

ntile

row_number

Sequential numbering per window partition

cume_dist

Cumulative distribution of records across window partitions

lag

lead

lead

explode

Caution
FIXME
scala> Seq(Array(0,1,2)).toDF("array").withColumn("num", explode('array)).show
+---------+---+
|    array|num|
+---------+---+
|[0, 1, 2]|  0|
|[0, 1, 2]|  1|
|[0, 1, 2]|  2|
+---------+---+

Ranking Records per Window Partition — rank functions

rank(): Column
dense_rank(): Column
percent_rank(): Column

rank functions assign the sequential rank of each distinct value per window partition. They are equivalent to RANK, DENSE_RANK and PERCENT_RANK functions in the good ol' SQL.

val dataset = spark.range(9).withColumn("bucket", 'id % 3)

import org.apache.spark.sql.expressions.Window
val byBucket = Window.partitionBy('bucket).orderBy('id)

scala> dataset.withColumn("rank", rank over byBucket).show
+---+------+----+
| id|bucket|rank|
+---+------+----+
|  0|     0|   1|
|  3|     0|   2|
|  6|     0|   3|
|  1|     1|   1|
|  4|     1|   2|
|  7|     1|   3|
|  2|     2|   1|
|  5|     2|   2|
|  8|     2|   3|
+---+------+----+

scala> dataset.withColumn("percent_rank", percent_rank over byBucket).show
+---+------+------------+
| id|bucket|percent_rank|
+---+------+------------+
|  0|     0|         0.0|
|  3|     0|         0.5|
|  6|     0|         1.0|
|  1|     1|         0.0|
|  4|     1|         0.5|
|  7|     1|         1.0|
|  2|     2|         0.0|
|  5|     2|         0.5|
|  8|     2|         1.0|
+---+------+------------+

rank function assigns the same rank for duplicate rows with a gap in the sequence (similarly to Olympic medal places). dense_rank is like rank for duplicate rows but compacts the ranks and removes the gaps.

// rank function with duplicates
// Note the missing/sparse ranks, i.e. 2 and 4
scala> dataset.union(dataset).withColumn("rank", rank over byBucket).show
+---+------+----+
| id|bucket|rank|
+---+------+----+
|  0|     0|   1|
|  0|     0|   1|
|  3|     0|   3|
|  3|     0|   3|
|  6|     0|   5|
|  6|     0|   5|
|  1|     1|   1|
|  1|     1|   1|
|  4|     1|   3|
|  4|     1|   3|
|  7|     1|   5|
|  7|     1|   5|
|  2|     2|   1|
|  2|     2|   1|
|  5|     2|   3|
|  5|     2|   3|
|  8|     2|   5|
|  8|     2|   5|
+---+------+----+

// dense_rank function with duplicates
// Note that the missing ranks are now filled in
scala> dataset.union(dataset).withColumn("dense_rank", dense_rank over byBucket).show
+---+------+----------+
| id|bucket|dense_rank|
+---+------+----------+
|  0|     0|         1|
|  0|     0|         1|
|  3|     0|         2|
|  3|     0|         2|
|  6|     0|         3|
|  6|     0|         3|
|  1|     1|         1|
|  1|     1|         1|
|  4|     1|         2|
|  4|     1|         2|
|  7|     1|         3|
|  7|     1|         3|
|  2|     2|         1|
|  2|     2|         1|
|  5|     2|         2|
|  5|     2|         2|
|  8|     2|         3|
|  8|     2|         3|
+---+------+----------+

// percent_rank function with duplicates
scala> dataset.union(dataset).withColumn("percent_rank", percent_rank over byBucket).show
+---+------+------------+
| id|bucket|percent_rank|
+---+------+------------+
|  0|     0|         0.0|
|  0|     0|         0.0|
|  3|     0|         0.4|
|  3|     0|         0.4|
|  6|     0|         0.8|
|  6|     0|         0.8|
|  1|     1|         0.0|
|  1|     1|         0.0|
|  4|     1|         0.4|
|  4|     1|         0.4|
|  7|     1|         0.8|
|  7|     1|         0.8|
|  2|     2|         0.0|
|  2|     2|         0.0|
|  5|     2|         0.4|
|  5|     2|         0.4|
|  8|     2|         0.8|
|  8|     2|         0.8|
+---+------+------------+

Cumulative Distribution of Records Across Window Partitions — cume_dist function

cume_dist(): Column

cume_dist computes the cumulative distribution of the records in window partitions. This is equivalent to SQL’s CUME_DIST function.

val buckets = spark.range(9).withColumn("bucket", 'id % 3)
// Make duplicates
val dataset = buckets.union(buckets)

import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy('bucket).orderBy('id)
scala> dataset.withColumn("cume_dist", cume_dist over windowSpec).show
+---+------+------------------+
| id|bucket|         cume_dist|
+---+------+------------------+
|  0|     0|0.3333333333333333|
|  3|     0|0.6666666666666666|
|  6|     0|               1.0|
|  1|     1|0.3333333333333333|
|  4|     1|0.6666666666666666|
|  7|     1|               1.0|
|  2|     2|0.3333333333333333|
|  5|     2|0.6666666666666666|
|  8|     2|               1.0|
+---+------+------------------+

lag functions

lag(e: Column, offset: Int): Column
lag(columnName: String, offset: Int): Column
lag(columnName: String, offset: Int, defaultValue: Any): Column
lag(e: Column, offset: Int, defaultValue: Any): Column

lag returns the value in e / columnName column that is offset records before the current record. lag returns null value if the number of records in a window partition is less than offset or defaultValue.

val buckets = spark.range(9).withColumn("bucket", 'id % 3)
// Make duplicates
val dataset = buckets.union(buckets)

import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy('bucket).orderBy('id)
scala> dataset.withColumn("lag", lag('id, 1) over windowSpec).show
+---+------+----+
| id|bucket| lag|
+---+------+----+
|  0|     0|null|
|  3|     0|   0|
|  6|     0|   3|
|  1|     1|null|
|  4|     1|   1|
|  7|     1|   4|
|  2|     2|null|
|  5|     2|   2|
|  8|     2|   5|
+---+------+----+

scala> dataset.withColumn("lag", lag('id, 2, "<default_value>") over windowSpec).show
+---+------+----+
| id|bucket| lag|
+---+------+----+
|  0|     0|null|
|  3|     0|null|
|  6|     0|   0|
|  1|     1|null|
|  4|     1|null|
|  7|     1|   1|
|  2|     2|null|
|  5|     2|null|
|  8|     2|   2|
+---+------+----+
Caution
FIXME It looks like lag with a default value has a bug — the default value’s not used at all.

lead functions

lead(columnName: String, offset: Int): Column
lead(e: Column, offset: Int): Column
lead(columnName: String, offset: Int, defaultValue: Any): Column
lead(e: Column, offset: Int, defaultValue: Any): Column

lead returns the value that is offset records after the current records, and defaultValue if there is less than offset records after the current record. lag returns null value if the number of records in a window partition is less than offset or defaultValue.

val buckets = spark.range(9).withColumn("bucket", 'id % 3)
// Make duplicates
val dataset = buckets.union(buckets)

import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy('bucket).orderBy('id)
scala> dataset.withColumn("lead", lead('id, 1) over windowSpec).show
+---+------+----+
| id|bucket|lead|
+---+------+----+
|  0|     0|   0|
|  0|     0|   3|
|  3|     0|   3|
|  3|     0|   6|
|  6|     0|   6|
|  6|     0|null|
|  1|     1|   1|
|  1|     1|   4|
|  4|     1|   4|
|  4|     1|   7|
|  7|     1|   7|
|  7|     1|null|
|  2|     2|   2|
|  2|     2|   5|
|  5|     2|   5|
|  5|     2|   8|
|  8|     2|   8|
|  8|     2|null|
+---+------+----+

scala> dataset.withColumn("lead", lead('id, 2, "<default_value>") over windowSpec).show
+---+------+----+
| id|bucket|lead|
+---+------+----+
|  0|     0|   3|
|  0|     0|   3|
|  3|     0|   6|
|  3|     0|   6|
|  6|     0|null|
|  6|     0|null|
|  1|     1|   4|
|  1|     1|   4|
|  4|     1|   7|
|  4|     1|   7|
|  7|     1|null|
|  7|     1|null|
|  2|     2|   5|
|  2|     2|   5|
|  5|     2|   8|
|  5|     2|   8|
|  8|     2|null|
|  8|     2|null|
+---+------+----+
Caution
FIXME It looks like lead with a default value has a bug — the default value’s not used at all.

Sequential numbering per window partition — row_number functions

row_number(): Column

row_number returns a sequential number starting at 1 within a window partition.

val buckets = spark.range(9).withColumn("bucket", 'id % 3)
// Make duplicates
val dataset = buckets.union(buckets)

import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy('bucket).orderBy('id)
scala> dataset.withColumn("row_number", row_number() over windowSpec).show
+---+------+----------+
| id|bucket|row_number|
+---+------+----------+
|  0|     0|         1|
|  0|     0|         2|
|  3|     0|         3|
|  3|     0|         4|
|  6|     0|         5|
|  6|     0|         6|
|  1|     1|         1|
|  1|     1|         2|
|  4|     1|         3|
|  4|     1|         4|
|  7|     1|         5|
|  7|     1|         6|
|  2|     2|         1|
|  2|     2|         2|
|  5|     2|         3|
|  5|     2|         4|
|  8|     2|         5|
|  8|     2|         6|
+---+------+----------+

ntile function

ntile(n: Int): Column

ntile computes the ntile group id (from 1 to n inclusive) in an ordered window partition.

val dataset = spark.range(7).select('*, 'id % 3 as "bucket")

import org.apache.spark.sql.expressions.Window
val byBuckets = Window.partitionBy('bucket).orderBy('id)
scala> dataset.select('*, ntile(3) over byBuckets as "ntile").show
+---+------+-----+
| id|bucket|ntile|
+---+------+-----+
|  0|     0|    1|
|  3|     0|    2|
|  6|     0|    3|
|  1|     1|    1|
|  4|     1|    2|
|  2|     2|    1|
|  5|     2|    2|
+---+------+-----+
Caution
FIXME How is ntile different from rank? What about performance?

Creating Columns — col and column methods

col(colName: String): Column
column(colName: String): Column

col and column methods create a Column that you can later use to reference a column in a dataset.

import org.apache.spark.sql.functions._

scala> val nameCol = col("name")
nameCol: org.apache.spark.sql.Column = name

scala> val cityCol = column("city")
cityCol: org.apache.spark.sql.Column = city

Defining UDFs (udf factories)

udf(f: FunctionN[...]): UserDefinedFunction

The udf family of functions allows you to create user-defined functions (UDFs) based on a user-defined function in Scala. It accepts f function of 0 to 10 arguments and the input and output types are automatically inferred (given the types of the respective input and output types of the function f).

import org.apache.spark.sql.functions._
val _length: String => Int = _.length
val _lengthUDF = udf(_length)

// define a dataframe
val df = sc.parallelize(0 to 3).toDF("num")

// apply the user-defined function to "num" column
scala> df.withColumn("len", _lengthUDF($"num")).show
+---+---+
|num|len|
+---+---+
|  0|  1|
|  1|  1|
|  2|  1|
|  3|  1|
+---+---+

Since Spark 2.0.0, there is another variant of udf function:

udf(f: AnyRef, dataType: DataType): UserDefinedFunction

udf(f: AnyRef, dataType: DataType) allows you to use a Scala closure for the function argument (as f) and explicitly declaring the output data type (as dataType).

// given the dataframe above

import org.apache.spark.sql.types.IntegerType
val byTwo = udf((n: Int) => n * 2, IntegerType)

scala> df.withColumn("len", byTwo($"num")).show
+---+---+
|num|len|
+---+---+
|  0|  0|
|  1|  2|
|  2|  4|
|  3|  6|
+---+---+

String functions

split function

split(str: Column, pattern: String): Column

split function splits str column using pattern. It returns a new Column.

val df = Seq((0, "hello|world"), (1, "witaj|swiecie")).toDF("num", "input")
val withSplit = df.withColumn("split", split($"input", "[|]"))

scala> withSplit.show
+---+-------------+----------------+
|num|        input|           split|
+---+-------------+----------------+
|  0|  hello|world|  [hello, world]|
|  1|witaj|swiecie|[witaj, swiecie]|
+---+-------------+----------------+
Note
.$|()[{^?*+\ are RegEx’s meta characters and are considered special.

upper function

upper(e: Column): Column

upper function converts a string column into one with all letter upper. It returns a new Column.

Note
The following example uses two functions that accept a Column and return another to showcase how to chain them.
val df = Seq((0,1,"hello"), (2,3,"world"), (2,4, "ala")).toDF("id", "val", "name")
val withUpperReversed = df.withColumn("upper", reverse(upper($"name")))

scala> withUpperReversed.show
+---+---+-----+-----+
| id|val| name|upper|
+---+---+-----+-----+
|  0|  1|hello|OLLEH|
|  2|  3|world|DLROW|
|  2|  4|  ala|  ALA|
+---+---+-----+-----+

Non-aggregate functions

They are also called normal functions.

struct functions

struct(cols: Column*): Column
struct(colName: String, colNames: String*): Column

struct family of functions allows you to create a new struct column based on a collection of Column or their names.

Note
The difference between struct and another similar array function is that the types of the columns can be different (in struct).
scala> df.withColumn("struct", struct($"name", $"val")).show
+---+---+-----+---------+
| id|val| name|   struct|
+---+---+-----+---------+
|  0|  1|hello|[hello,1]|
|  2|  3|world|[world,3]|
|  2|  4|  ala|  [ala,4]|
+---+---+-----+---------+

broadcast function

broadcast[T](df: Dataset[T]): Dataset[T]

broadcast function marks the input Dataset small enough to be used in broadcast join.

Tip
Consult Broadcast Join document.
val left = Seq((0, "aa"), (0, "bb")).toDF("id", "token").as[(Int, String)]
val right = Seq(("aa", 0.99), ("bb", 0.57)).toDF("token", "prob").as[(String, Double)]

scala> left.join(broadcast(right), "token").explain(extended = true)
== Parsed Logical Plan ==
'Join UsingJoin(Inner,List(token))
:- Project [_1#123 AS id#126, _2#124 AS token#127]
:  +- LocalRelation [_1#123, _2#124]
+- BroadcastHint
   +- Project [_1#136 AS token#139, _2#137 AS prob#140]
      +- LocalRelation [_1#136, _2#137]

== Analyzed Logical Plan ==
token: string, id: int, prob: double
Project [token#127, id#126, prob#140]
+- Join Inner, (token#127 = token#139)
   :- Project [_1#123 AS id#126, _2#124 AS token#127]
   :  +- LocalRelation [_1#123, _2#124]
   +- BroadcastHint
      +- Project [_1#136 AS token#139, _2#137 AS prob#140]
         +- LocalRelation [_1#136, _2#137]

== Optimized Logical Plan ==
Project [token#127, id#126, prob#140]
+- Join Inner, (token#127 = token#139)
   :- Project [_1#123 AS id#126, _2#124 AS token#127]
   :  +- Filter isnotnull(_2#124)
   :     +- LocalRelation [_1#123, _2#124]
   +- BroadcastHint
      +- Project [_1#136 AS token#139, _2#137 AS prob#140]
         +- Filter isnotnull(_1#136)
            +- LocalRelation [_1#136, _2#137]

== Physical Plan ==
*Project [token#127, id#126, prob#140]
+- *BroadcastHashJoin [token#127], [token#139], Inner, BuildRight
   :- *Project [_1#123 AS id#126, _2#124 AS token#127]
   :  +- *Filter isnotnull(_2#124)
   :     +- LocalTableScan [_1#123, _2#124]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
      +- *Project [_1#136 AS token#139, _2#137 AS prob#140]
         +- *Filter isnotnull(_1#136)
            +- LocalTableScan [_1#136, _2#137]

expr function

expr(expr: String): Column

expr function parses the input expr SQL string to a Column it represents.

val ds = Seq((0, "hello"), (1, "world"))
  .toDF("id", "token")
  .as[(Long, String)]

scala> ds.show
+---+-----+
| id|token|
+---+-----+
|  0|hello|
|  1|world|
+---+-----+

val filterExpr = expr("token = 'hello'")

scala> ds.filter(filterExpr).show
+---+-----+
| id|token|
+---+-----+
|  0|hello|
+---+-----+

Internally, expr uses the active session’s sqlParser or creates a new SparkSqlParser to call parseExpression method.

count

Caution
FIXME

Generating Tumbling Time Windows — window functions

window(
  timeColumn: Column,
  windowDuration: String): Column  (1)
window(
  timeColumn: Column,
  windowDuration: String,
  slideDuration: String): Column   (2)
window(
  timeColumn: Column,
  windowDuration: String,
  slideDuration: String,
  startTime: String): Column
  1. With slideDuration as windowDuration and 0 second for startTime

  2. With 0 second for startTime

window generates tumbling time windows of windowDuration duration given a timeColumn timestamp specifying column.

scala> window('time, "5 seconds")
res0: org.apache.spark.sql.Column = timewindow(time, 5000000, 5000000, 0) AS `window`

timeColumn must be of TimestampType, i.e. java.sql.Timestamp values.

Tip
Use java.sql.Timestamp.from or java.sql.Timestamp.valueOf factory methods to create Timestamp instances.

windowDuration and slideDuration are of strings specifying the width of the window for duration and sliding identifiers, respectively.

Tip
Use CalendarInterval for valid window identifiers.
Note
window is available as of Spark 2.0.0.

Internally, window creates a Column with TimeWindow expression.

results matching ""

    No results matching ""