Standard Functions — functions Object

org.apache.spark.sql.functions object defines built-in standard functions to work with (values produced by) columns.

You can access the standard functions using the following import statement in your Scala application:

import org.apache.spark.sql.functions._
Table 1. (Subset of) Standard Functions in Spark SQL
Name Description

Aggregate functions

approx_count_distinct

approx_count_distinct(e: Column): Column
approx_count_distinct(columnName: String): Column
approx_count_distinct(e: Column, rsd: Double): Column
approx_count_distinct(columnName: String, rsd: Double): Column

avg

avg(e: Column): Column
avg(columnName: String): Column

collect_list

collect_list(e: Column): Column
collect_list(columnName: String): Column

collect_set

collect_set(e: Column): Column
collect_set(columnName: String): Column

corr

corr(column1: Column, column2: Column): Column
corr(columnName1: String, columnName2: String): Column

count

count(e: Column): Column
count(columnName: String): TypedColumn[Any, Long]

countDistinct

countDistinct(expr: Column, exprs: Column*): Column
countDistinct(columnName: String, columnNames: String*): Column

covar_pop

covar_pop(column1: Column, column2: Column): Column
covar_pop(columnName1: String, columnName2: String): Column

covar_samp

covar_samp(column1: Column, column2: Column): Column
covar_samp(columnName1: String, columnName2: String): Column

first

first(e: Column): Column
first(e: Column, ignoreNulls: Boolean): Column
first(columnName: String): Column
first(columnName: String, ignoreNulls: Boolean): Column

Returns the first value in a group. Returns the first non-null value when ignoreNulls flag on. If all values are null, then returns null.

grouping

grouping(e: Column): Column
grouping(columnName: String): Column

Indicates whether a given column is aggregated or not

grouping_id

grouping_id(cols: Column*): Column
grouping_id(colName: String, colNames: String*): Column

Computes the level of grouping

kurtosis

kurtosis(e: Column): Column
kurtosis(columnName: String): Column

last

last(e: Column, ignoreNulls: Boolean): Column
last(columnName: String, ignoreNulls: Boolean): Column
last(e: Column): Column
last(columnName: String): Column

max

max(e: Column): Column
max(columnName: String): Column

mean

mean(e: Column): Column
mean(columnName: String): Column

min

min(e: Column): Column
min(columnName: String): Column

skewness

skewness(e: Column): Column
skewness(columnName: String): Column

stddev

stddev(e: Column): Column
stddev(columnName: String): Column

stddev_pop

stddev_pop(e: Column): Column
stddev_pop(columnName: String): Column

stddev_samp

stddev_samp(e: Column): Column
stddev_samp(columnName: String): Column

sum

sum(e: Column): Column
sum(columnName: String): Column

sumDistinct

sumDistinct(e: Column): Column
sumDistinct(columnName: String): Column

variance

variance(e: Column): Column
variance(columnName: String): Column

var_pop

var_pop(e: Column): Column
var_pop(columnName: String): Column

var_samp

var_samp(e: Column): Column
var_samp(columnName: String): Column

Collection functions

array_contains

array_contains(column: Column, value: Any): Column

array_distinct

array_distinct(e: Column): Column

(New in 2.4.0)

array_except

array_except(e: Column): Column

(New in 2.4.0)

array_intersect

array_intersect(col1: Column, col2: Column): Column

(New in 2.4.0)

array_join

array_join(column: Column, delimiter: String): Column
array_join(column: Column, delimiter: String, nullReplacement: String): Column

(New in 2.4.0)

array_max

array_max(e: Column): Column

(New in 2.4.0)

array_min

array_min(e: Column): Column

(New in 2.4.0)

array_position

array_position(column: Column, value: Any): Column

(New in 2.4.0)

array_remove

array_remove(column: Column, element: Any): Column

(New in 2.4.0)

array_repeat

array_repeat(e: Column, count: Int): Column
array_repeat(left: Column, right: Column): Column

(New in 2.4.0)

array_sort

array_sort(e: Column): Column

(New in 2.4.0)

array_union

array_union(col1: Column, col2: Column): Column

(New in 2.4.0)

arrays_zip

arrays_zip(e: Column*): Column

(New in 2.4.0)

arrays_overlap

arrays_overlap(a1: Column, a2: Column): Column

(New in 2.4.0)

element_at

element_at(column: Column, value: Any): Column

(New in 2.4.0)

explode

explode(e: Column): Column

explode_outer

explode_outer(e: Column): Column

Creates a new row for each element in the given array or map column. If the array/map is null or empty then null is produced.

flatten

flatten(e: Column): Column

(New in 2.4.0)

from_json

from_json(e: Column, schema: Column): Column (1)
from_json(e: Column, schema: DataType): Column
from_json(e: Column, schema: DataType, options: Map[String, String]): Column
from_json(e: Column, schema: String, options: Map[String, String]): Column
from_json(e: Column, schema: StructType): Column
from_json(e: Column, schema: StructType, options: Map[String, String]): Column
  1. New in 2.4.0

Parses a column with a JSON string into a StructType or ArrayType of StructType elements with the specified schema.

map_concat

map_concat(cols: Column*): Column

(New in 2.4.0)

map_from_entries

map_from_entries(e: Column): Column

(New in 2.4.0)

map_keys

map_keys(e: Column): Column

map_values

map_values(e: Column): Column

posexplode

posexplode(e: Column): Column

posexplode_outer

posexplode_outer(e: Column): Column

reverse

reverse(e: Column): Column

Returns a reversed string or an array with reverse order of elements

Note
Support for reversing arrays is new in 2.4.0.

schema_of_json

schema_of_json(json: Column): Column
schema_of_json(json: String): Column

(New in 2.4.0)

sequence

sequence(start: Column, stop: Column): Column
sequence(start: Column, stop: Column, step: Column): Column

(New in 2.4.0)

shuffle

shuffle(e: Column): Column

(New in 2.4.0)

size

size(e: Column): Column

Returns the size of the given array or map. Returns -1 if null.

slice

slice(x: Column, start: Int, length: Int): Column

(New in 2.4.0)

Date and time functions

current_date

current_date(): Column

current_timestamp

current_timestamp(): Column

from_utc_timestamp

from_utc_timestamp(ts: Column, tz: String): Column
from_utc_timestamp(ts: Column, tz: Column): Column  (1)
  1. New in 2.4.0

months_between

months_between(end: Column, start: Column): Column
months_between(end: Column, start: Column, roundOff: Boolean): Column (1)
  1. New in 2.4.0

to_date

to_date(e: Column): Column
to_date(e: Column, fmt: String): Column

to_timestamp

to_timestamp(s: Column): Column
to_timestamp(s: Column, fmt: String): Column

to_utc_timestamp

to_utc_timestamp(ts: Column, tz: String): Column
to_utc_timestamp(ts: Column, tz: Column): Column (1)
  1. New in 2.4.0

unix_timestamp

Converts current or specified time to Unix timestamp (in seconds)

unix_timestamp(): Column
unix_timestamp(s: Column): Column
unix_timestamp(s: Column, p: String): Column

window

Generates tumbling time windows

window(
  timeColumn: Column,
  windowDuration: String): Column
window(
  timeColumn: Column,
  windowDuration: String,
  slideDuration: String): Column
window(
  timeColumn: Column,
  windowDuration: String,
  slideDuration: String,
  startTime: String): Column

Math functions

bin

Converts the value of a long column to binary format

Regular functions (Non-aggregate functions)

array

broadcast

coalesce

Gives the first non-null value among the given columns or null

col and column

Creating Columns

expr

lit

map

monotonically_increasing_id

Returns monotonically increasing 64-bit integers that are guaranteed to be monotonically increasing and unique, but not consecutive.

struct

typedLit

when

String functions

split

upper

UDF functions

udf

Creating UDFs

callUDF

Executing an UDF by name with variable-length list of columns

Window functions

cume_dist

cume_dist(): Column

Computes the cumulative distribution of records across window partitions

currentRow

currentRow(): Column

dense_rank

dense_rank(): Column

Computes the rank of records per window partition

lag

lag(e: Column, offset: Int): Column
lag(columnName: String, offset: Int): Column
lag(columnName: String, offset: Int, defaultValue: Any): Column

lead

lead(columnName: String, offset: Int): Column
lead(e: Column, offset: Int): Column
lead(columnName: String, offset: Int, defaultValue: Any): Column
lead(e: Column, offset: Int, defaultValue: Any): Column

ntile

ntile(n: Int): Column

Computes the ntile group

percent_rank

percent_rank(): Column

Computes the rank of records per window partition

rank

rank(): Column

Computes the rank of records per window partition

row_number

row_number(): Column

Computes the sequential numbering per window partition

unboundedFollowing

unboundedFollowing(): Column

unboundedPreceding

unboundedPreceding(): Column
Tip
The page gives only a brief ovierview of the many functions available in functions object and so you should read the official documentation of the functions object.

Executing UDF by Name and Variable-Length Column List — callUDF Function

callUDF(udfName: String, cols: Column*): Column

callUDF executes an UDF by udfName and variable-length list of columns.

Defining UDFs — udf Function

udf(f: FunctionN[...]): UserDefinedFunction

The udf family of functions allows you to create user-defined functions (UDFs) based on a user-defined function in Scala. It accepts f function of 0 to 10 arguments and the input and output types are automatically inferred (given the types of the respective input and output types of the function f).

import org.apache.spark.sql.functions._
val _length: String => Int = _.length
val _lengthUDF = udf(_length)

// define a dataframe
val df = sc.parallelize(0 to 3).toDF("num")

// apply the user-defined function to "num" column
scala> df.withColumn("len", _lengthUDF($"num")).show
+---+---+
|num|len|
+---+---+
|  0|  1|
|  1|  1|
|  2|  1|
|  3|  1|
+---+---+

Since Spark 2.0.0, there is another variant of udf function:

udf(f: AnyRef, dataType: DataType): UserDefinedFunction

udf(f: AnyRef, dataType: DataType) allows you to use a Scala closure for the function argument (as f) and explicitly declaring the output data type (as dataType).

// given the dataframe above

import org.apache.spark.sql.types.IntegerType
val byTwo = udf((n: Int) => n * 2, IntegerType)

scala> df.withColumn("len", byTwo($"num")).show
+---+---+
|num|len|
+---+---+
|  0|  0|
|  1|  2|
|  2|  4|
|  3|  6|
+---+---+

split Function

split(str: Column, pattern: String): Column

split function splits str column using pattern. It returns a new Column.

val df = Seq((0, "hello|world"), (1, "witaj|swiecie")).toDF("num", "input")
val withSplit = df.withColumn("split", split($"input", "[|]"))

scala> withSplit.show
+---+-------------+----------------+
|num|        input|           split|
+---+-------------+----------------+
|  0|  hello|world|  [hello, world]|
|  1|witaj|swiecie|[witaj, swiecie]|
+---+-------------+----------------+
Note
.$|()[{^?*+\ are RegEx’s meta characters and are considered special.

upper Function

upper(e: Column): Column

upper function converts a string column into one with all letter upper. It returns a new Column.

Note
The following example uses two functions that accept a Column and return another to showcase how to chain them.
val df = Seq((0,1,"hello"), (2,3,"world"), (2,4, "ala")).toDF("id", "val", "name")
val withUpperReversed = df.withColumn("upper", reverse(upper($"name")))

scala> withUpperReversed.show
+---+---+-----+-----+
| id|val| name|upper|
+---+---+-----+-----+
|  0|  1|hello|OLLEH|
|  2|  3|world|DLROW|
|  2|  4|  ala|  ALA|
+---+---+-----+-----+

Converting Long to Binary Format (in String Representation) — bin Function

bin(e: Column): Column
bin(columnName: String): Column (1)
  1. Calls the first bin with columnName as a Column

bin converts the long value in a column to its binary format (i.e. as an unsigned integer in base 2) with no extra leading 0s.

scala> spark.range(5).withColumn("binary", bin('id)).show
+---+------+
| id|binary|
+---+------+
|  0|     0|
|  1|     1|
|  2|    10|
|  3|    11|
|  4|   100|
+---+------+

val withBin = spark.range(5).withColumn("binary", bin('id))
scala> withBin.printSchema
root
 |-- id: long (nullable = false)
 |-- binary: string (nullable = false)

Internally, bin creates a Column with Bin unary expression.

scala> withBin.queryExecution.logical
res2: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
'Project [*, bin('id) AS binary#14]
+- Range (0, 5, step=1, splits=Some(8))
Note
Bin unary expression uses java.lang.Long.toBinaryString for the conversion.
Note

Bin expression supports code generation (aka CodeGen).

val withBin = spark.range(5).withColumn("binary", bin('id))
scala> withBin.queryExecution.debug.codegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Project [id#19L, bin(id#19L) AS binary#22]
+- *Range (0, 5, step=1, splits=Some(8))
...
/* 103 */           UTF8String project_value1 = null;
/* 104 */           project_value1 = UTF8String.fromString(java.lang.Long.toBinaryString(range_value));

results matching ""

    No results matching ""