Column Expressions and Operators

Column represents a column in a Dataset that holds a Catalyst Expression to produce a value per row.

Note
A Column is a value generator for every row in a Dataset.

With the implicits converstions imported, you can create "free" column references using Scala’s symbols.

val spark: SparkSession = ...
import spark.implicits._

import org.apache.spark.sql.Column
scala> val nameCol: Column = 'name
nameCol: org.apache.spark.sql.Column = name
Note
"Free" column references are Columns with no association to a Dataset.

You can also create free column references from $-prefixed strings.

// Note that $ alone creates a ColumnName
scala> val idCol = $"id"
idCol: org.apache.spark.sql.ColumnName = id

import org.apache.spark.sql.Column

// The target type triggers the implicit conversion to Column
scala> val idCol: Column = $"id"
idCol: org.apache.spark.sql.Column = id

Beside using the implicits conversions, you can create columns using col and column functions.

import org.apache.spark.sql.functions._

scala> val nameCol = col("name")
nameCol: org.apache.spark.sql.Column = name

scala> val cityCol = column("city")
cityCol: org.apache.spark.sql.Column = city

Finally, you can create a bound Column using the Dataset the column is supposed to be part of using Dataset.apply factory method or Dataset.col operator.

Note
You can use bound Column references only with the Datasets they have been created from.
scala> val textCol = dataset.col("text")
textCol: org.apache.spark.sql.Column = text

scala> val idCol = dataset.apply("id")
idCol: org.apache.spark.sql.Column = id

scala> val idCol = dataset("id")
idCol: org.apache.spark.sql.Column = id

You can reference nested columns using . (dot).

Note

Column has a reference to Catalyst’s Expression it was created for using expr method.

scala> window('time, "5 seconds").expr
res0: org.apache.spark.sql.catalyst.expressions.Expression = timewindow('time, 5000000, 5000000, 0) AS window#1
Tip
Read about typed column references in TypedColumn Expressions.

Adding Column to Dataset — withColumn Method

withColumn(colName: String, col: Column): DataFrame

withColumn method returns a new DataFrame with the new column col with colName name added.

Note
withColumn can replace an existing colName column.
scala> val df = Seq((1, "jeden"), (2, "dwa")).toDF("number", "polish")
df: org.apache.spark.sql.DataFrame = [number: int, polish: string]

scala> df.show
+------+------+
|number|polish|
+------+------+
|     1| jeden|
|     2|   dwa|
+------+------+

scala> df.withColumn("polish", lit(1)).show
+------+------+
|number|polish|
+------+------+
|     1|     1|
|     2|     1|
+------+------+

You can add new columns do a Dataset using withColumn method.

val spark: SparkSession = ...
val dataset = spark.range(5)

// Add a new column called "group"
scala> dataset.withColumn("group", 'id % 2).show
+---+-----+
| id|group|
+---+-----+
|  0|    0|
|  1|    1|
|  2|    0|
|  3|    1|
|  4|    0|
+---+-----+

Referencing Column — apply Method

val spark: SparkSession = ...
case class Word(id: Long, text: String)
val dataset = Seq(Word(0, "hello"), Word(1, "spark")).toDS

scala> val idCol = dataset.apply("id")
idCol: org.apache.spark.sql.Column = id

// or using Scala's magic a little bit
// the following is equivalent to the above explicit apply call
scala> val idCol = dataset("id")
idCol: org.apache.spark.sql.Column = id

Creating Column — col method

val spark: SparkSession = ...
case class Word(id: Long, text: String)
val dataset = Seq(Word(0, "hello"), Word(1, "spark")).toDS

scala> val textCol = dataset.col("text")
textCol: org.apache.spark.sql.Column = text

like Operator

Caution
FIXME
scala> df("id") like "0"
res0: org.apache.spark.sql.Column = id LIKE 0

scala> df.filter('id like "0").show
+---+-----+
| id| text|
+---+-----+
|  0|hello|
+---+-----+

Symbols As Column Names

scala> val df = Seq((0, "hello"), (1, "world")).toDF("id", "text")
df: org.apache.spark.sql.DataFrame = [id: int, text: string]

scala> df.select('id)
res0: org.apache.spark.sql.DataFrame = [id: int]

scala> df.select('id).show
+---+
| id|
+---+
|  0|
|  1|
+---+

Defining Windowing Column (Analytic Clause) — over Operator

over(): Column
over(window: WindowSpec): Column

over creates a windowing column (aka analytic clause) that allows to execute a aggregate function over a window (i.e. a group of records that are in some relation to the current record).

Tip
Read up on windowed aggregation in Spark SQL in Window Aggregate Functions.
scala> val overUnspecifiedFrame = $"someColumn".over()
overUnspecifiedFrame: org.apache.spark.sql.Column = someColumn OVER (UnspecifiedFrame)

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.WindowSpec
val spec: WindowSpec = Window.rangeBetween(Window.unboundedPreceding, Window.currentRow)
scala> val overRange = $"someColumn" over spec
overRange: org.apache.spark.sql.Column = someColumn OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)

cast Operator

cast method casts a column to a data type. It makes for type-safe maps with Row objects of the proper type (not Any).

cast(to: String): Column
cast(to: DataType): Column

cast uses CatalystSqlParser to parse the data type from its canonical string representation.

cast Example

scala> val df = Seq((0f, "hello")).toDF("label", "text")
df: org.apache.spark.sql.DataFrame = [label: float, text: string]

scala> df.printSchema
root
 |-- label: float (nullable = false)
 |-- text: string (nullable = true)

// without cast
import org.apache.spark.sql.Row
scala> df.select("label").map { case Row(label) => label.getClass.getName }.show(false)
+---------------+
|value          |
+---------------+
|java.lang.Float|
+---------------+

// with cast
import org.apache.spark.sql.types.DoubleType
scala> df.select(col("label").cast(DoubleType)).map { case Row(label) => label.getClass.getName }.show(false)
+----------------+
|value           |
+----------------+
|java.lang.Double|
+----------------+

results matching ""

    No results matching ""