Dataset Operators

You can group the set of all operators to use with Datasets per their target, i.e. the part of a Dataset they are applied to.

Beside the above operators, there are the following ones working with a Dataset as a whole.

Table 1. Dataset Operators
Operator Description


Converting a Dataset to a Dataset


Repartitioning a Dataset with shuffle disabled.


Counts the number of rows





Explain logical and physical plans of a Dataset




Internally, foreach executes foreach action on the Dataset’s RDD.


Internally, foreachPartition executes foreachPartition action on the Dataset’s RDD.



Randomly split a Dataset into two Datasets



Reduces the elements of a Dataset using the specified binary function.

Internally, reduce executes reduce action on the Dataset’s RDD.


Repartitioning a Dataset with shuffle enabled.







Converts a Dataset to a DataFrame



Transforms a Dataset



Creates a streaming Dataset with EventTimeWatermark logical operator

Used exclusively in Structured Streaming.



count Operator


toLocalIterator Operator


createTempViewCommand Internal Operator


createGlobalTempView Operator


createOrReplaceTempView Operator


createTempView Operator


Transforming Datasets — transform Operator

transform[U](t: Dataset[T] => Dataset[U]): Dataset[U]

transform applies t function to the source Dataset[T] to produce a result Dataset[U]. It is for chaining custom transformations.

val dataset = spark.range(5)

// Transformation t
import org.apache.spark.sql.Dataset
def withDoubled(longs: Dataset[java.lang.Long]) = longs.withColumn("doubled", 'id * 2)

scala> dataset.transform(withDoubled).show
| id|doubled|
|  0|      0|
|  1|      2|
|  2|      4|
|  3|      6|
|  4|      8|

Internally, transform executes t function on the current Dataset[T].

Converting "Typed" Dataset to "Untyped" DataFrame — toDF Methods

toDF(): DataFrame
toDF(colNames: String*): DataFrame

toDF converts a Dataset into a DataFrame.

Internally, the empty-argument toDF creates a Dataset[Row] using the Dataset's SparkSession and QueryExecution with the encoder being RowEncoder.

FIXME Describe toDF(colNames: String*)

Converting to Dataset — as Method


Accessing DataFrameWriter — write Method

write: DataFrameWriter[T]

write method returns DataFrameWriter for records of type T.

import org.apache.spark.sql.{DataFrameWriter, Dataset}
val ints: Dataset[Int] = (0 to 5).toDS

val writer: DataFrameWriter[Int] = ints.write

Accessing DataStreamWriter — writeStream Method

writeStream: DataStreamWriter[T]

writeStream method returns DataStreamWriter for records of type T.

val papers = spark.readStream.text("papers").as[String]

import org.apache.spark.sql.streaming.DataStreamWriter
val writer: DataStreamWriter[String] = papers.writeStream

Display Records — show Methods

show(): Unit
show(numRows: Int): Unit
show(truncate: Boolean): Unit
show(numRows: Int, truncate: Boolean): Unit
show(numRows: Int, truncate: Int): Unit

Internally, show relays to a private showString to do the formatting. It turns the Dataset into a DataFrame (by calling toDF()) and takes first n records.

Taking First n Records — take Action

take(n: Int): Array[T]

take is an action on a Dataset that returns a collection of n records.

take loads all the data into the memory of the Spark application’s driver process and for a large n could result in OutOfMemoryError.

Internally, take creates a new Dataset with Limit logical plan for Literal expression and the current LogicalPlan. It then runs the SparkPlan that produces a Array[InternalRow] that is in turn decoded to Array[T] using a bounded encoder.

foreachPartition Action

foreachPartition(f: Iterator[T] => Unit): Unit

foreachPartition applies the f function to each partition of the Dataset.

case class Record(id: Int, city: String)
val ds = Seq(Record(0, "Warsaw"), Record(1, "London")).toDS

ds.foreachPartition { iter: Iterator[Record] => iter.foreach(println) }
foreachPartition is used to save a DataFrame to a JDBC table (indirectly through JdbcUtils.saveTable) and ForeachSink.

mapPartitions Operator

mapPartitions[U: Encoder](func: Iterator[T] => Iterator[U]): Dataset[U]

mapPartitions returns a new Dataset (of type U) with the function func applied to each partition.

FIXME Example

Creating Zero or More Records — flatMap Operator

flatMap[U: Encoder](func: T => TraversableOnce[U]): Dataset[U]

flatMap returns a new Dataset (of type U) with all records (of type T) mapped over using the function func and then flattening the results.

flatMap can create new records. It deprecated explode.
final case class Sentence(id: Long, text: String)
val sentences = Seq(Sentence(0, "hello world"), Sentence(1, "witaj swiecie")).toDS

scala> sentences.flatMap(s => s.text.split("\\s+")).show
|  value|
|  hello|
|  world|
|  witaj|

Internally, flatMap calls mapPartitions with the partitions flatMap(ped).

Repartitioning Dataset with Shuffle Disabled — coalesce Operator

coalesce(numPartitions: Int): Dataset[T]

coalesce operator repartitions the Dataset to exactly numPartitions partitions.

Internally, coalesce creates a Repartition logical operator with shuffle disabled (which is marked as false in the below explain's output).

scala> spark.range(5).coalesce(1).explain(extended = true)
== Parsed Logical Plan ==
Repartition 1, false
+- Range (0, 5, step=1, splits=Some(8))

== Analyzed Logical Plan ==
id: bigint
Repartition 1, false
+- Range (0, 5, step=1, splits=Some(8))

== Optimized Logical Plan ==
Repartition 1, false
+- Range (0, 5, step=1, splits=Some(8))

== Physical Plan ==
Coalesce 1
+- *Range (0, 5, step=1, splits=Some(8))

Repartitioning Dataset (Shuffle Enabled) — repartition Operator

repartition(numPartitions: Int): Dataset[T]
repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]
repartition(partitionExprs: Column*): Dataset[T]

repartition operators repartition the Dataset to exactly numPartitions partitions or using partitionExprs expressions.

Internally, repartition creates a Repartition or RepartitionByExpression logical operators with shuffle enabled (which is true in the below explain's output beside Repartition).

scala> spark.range(5).repartition(1).explain(extended = true)
== Parsed Logical Plan ==
Repartition 1, true
+- Range (0, 5, step=1, splits=Some(8))

== Analyzed Logical Plan ==
id: bigint
Repartition 1, true
+- Range (0, 5, step=1, splits=Some(8))

== Optimized Logical Plan ==
Repartition 1, true
+- Range (0, 5, step=1, splits=Some(8))

== Physical Plan ==
Exchange RoundRobinPartitioning(1)
+- *Range (0, 5, step=1, splits=Some(8))
repartition methods correspond to SQL’s DISTRIBUTE BY or CLUSTER BY clauses.

Projecting Columns — select Operator

select[U1: Encoder](c1: TypedColumn[T, U1]): Dataset[U1]
select[U1, U2](c1: TypedColumn[T, U1], c2: TypedColumn[T, U2]): Dataset[(U1, U2)]
select[U1, U2, U3](
  c1: TypedColumn[T, U1],
  c2: TypedColumn[T, U2],
  c3: TypedColumn[T, U3]): Dataset[(U1, U2, U3)]
select[U1, U2, U3, U4](
  c1: TypedColumn[T, U1],
  c2: TypedColumn[T, U2],
  c3: TypedColumn[T, U3],
  c4: TypedColumn[T, U4]): Dataset[(U1, U2, U3, U4)]
select[U1, U2, U3, U4, U5](
  c1: TypedColumn[T, U1],
  c2: TypedColumn[T, U2],
  c3: TypedColumn[T, U3],
  c4: TypedColumn[T, U4],
  c5: TypedColumn[T, U5]): Dataset[(U1, U2, U3, U4, U5)]

filter Operator


where Operator

where(condition: Column): Dataset[T]
where(conditionExpr: String): Dataset[T]

where is a synonym for filter operator, i.e. it simply passes the parameters on to filter.

Projecting Columns using Expressions — selectExpr Operator

selectExpr(exprs: String*): DataFrame

selectExpr is like select, but accepts SQL expressions exprs.

val ds = spark.range(5)

scala> ds.selectExpr("rand() as random").show
16/04/14 23:16:06 INFO HiveSqlParser: Parsing command: rand() as random
|             random|
|  0.887675894185651|
| 0.2700020856675186|
| 0.1489033635529543|
| 0.5862990791950973|

Internally, it executes select with every expression in exprs mapped to Column (using SparkSqlParser.parseExpression).

scala>"rand() as random")).show
|            random|
A new feature in Spark 2.0.0.

Randomly Split Dataset — randomSplit Operator

randomSplit(weights: Array[Double]): Array[Dataset[T]]
randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]]

randomSplit randomly splits the Dataset per weights.

weights doubles should sum up to 1 and will be normalized if they do not.

You can define seed and if you don’t, a random seed will be used.

It is used in TrainValidationSplit to split dataset into training and validation datasets.
val ds = spark.range(10)
scala> ds.randomSplit(Array[Double](2, 3)).foreach(
| id|
|  0|
|  1|
|  2|

| id|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
A new feature in Spark 2.0.0.

Displaying Logical and Physical Plans, Their Cost and Codegen — explain Operator

explain(): Unit
explain(extended: Boolean): Unit

explain prints the logical and (with extended flag enabled) physical plans, their cost and codegen to the console.

Use explain to review the structured queries and optimizations applied.

Internally, explain creates a ExplainCommand logical command and requests SessionState to execute it (to get a QueryExecution back).

explain uses ExplainCommand logical command that, when executed, gives different text representations of QueryExecution (for the Dataset’s LogicalPlan) depending on the flags (e.g. extended, codegen, and cost which are disabled by default).

explain then requests QueryExecution for SparkPlan and collects the records (as InternalRow objects).


explain uses Dataset’s SparkSession to access the current SessionState.

In the end, explain goes over the InternalRow records and converts them to lines to display to console.

explain "converts" an InternalRow record to a line using getString at position 0.
If you are serious about query debugging you could also use the Debugging Query Execution facility.
scala> spark.range(10).explain(extended = true)
== Parsed Logical Plan ==
Range (0, 10, step=1, splits=Some(8))

== Analyzed Logical Plan ==
id: bigint
Range (0, 10, step=1, splits=Some(8))

== Optimized Logical Plan ==
Range (0, 10, step=1, splits=Some(8))

== Physical Plan ==
*Range (0, 10, step=1, splits=Some(8))

toJSON Method

toJSON maps the content of Dataset to a Dataset of JSON strings.

A new feature in Spark 2.0.0.
scala> val ds = Seq("hello", "world", "foo bar").toDS
ds: org.apache.spark.sql.Dataset[String] = [value: string]

|              value|
|  {"value":"hello"}|
|  {"value":"world"}|
|{"value":"foo bar"}|

Internally, toJSON grabs the RDD[InternalRow] (of the QueryExecution of the Dataset) and maps the records (per RDD partition) into JSON.

toJSON uses Jackson’s JSON parser — jackson-module-scala.

Accessing Schema — schema Method

A Dataset has a schema.

schema: StructType

You may also use the following methods to learn about the schema:

Generating RDD of Internal Binary Rows — rdd Attribute

rdd: RDD[T]

Whenever you are in need to convert a Dataset into a RDD, executing rdd method gives you the RDD of the proper input object type (not Row as in DataFrames) that sits behind the Dataset.

scala> val rdd = tokens.rdd
rdd: org.apache.spark.rdd.RDD[Token] = MapPartitionsRDD[11] at rdd at <console>:30

Internally, it looks ExpressionEncoder (for the Dataset) up and accesses the deserializer expression. That gives the DataType of the result of evaluating the expression.

A deserializer expression is used to decode an InternalRow to an object of type T. See ExpressionEncoder.

It then executes a DeserializeToObject logical operator that will produce a RDD[InternalRow] that is converted into the proper RDD[T] using the DataType and T.

It is a lazy operation that "produces" a RDD[T].

Creating Streaming Dataset with EventTimeWatermark Logical Operator — withWatermark Operator

withWatermark(eventTime: String, delayThreshold: String): Dataset[T]

Internally, withWatermark creates a Dataset with EventTimeWatermark logical plan for streaming Datasets.

withWatermark uses EliminateEventTimeWatermark logical rule to eliminate EventTimeWatermark logical plan for non-streaming batch Datasets.
// Create a batch dataset
val events = spark.range(0, 50, 10).
  withColumn("timestamp", from_unixtime(unix_timestamp - 'id)).
  select('timestamp, 'id as "count")
|          timestamp|count|
|2017-06-25 21:21:14|    0|
|2017-06-25 21:21:04|   10|
|2017-06-25 21:20:54|   20|
|2017-06-25 21:20:44|   30|
|2017-06-25 21:20:34|   40|

// the dataset is a non-streaming batch one...
scala> events.isStreaming
res1: Boolean = false

// EventTimeWatermark is not included in the logical plan
val watermarked = events.
  withWatermark(eventTime = "timestamp", delayThreshold = "20 seconds")
scala> println(watermarked.queryExecution.logical.numberedTreeString)
00 Project [timestamp#284, id#281L AS count#288L]
01 +- Project [id#281L, from_unixtime((unix_timestamp(current_timestamp(), yyyy-MM-dd HH:mm:ss, Some(America/Chicago)) - id#281L), yyyy-MM-dd HH:mm:ss, Some(America/Chicago)) AS timestamp#284]
02    +- Range (0, 50, step=10, splits=Some(8))

// Let's create a streaming Dataset
import org.apache.spark.sql.types.StructType
val schema = new StructType().
scala> schema.printTreeString
 |-- timestamp: timestamp (nullable = true)
 |-- count: long (nullable = true)

val events = spark.
  withWatermark(eventTime = "timestamp", delayThreshold = "20 seconds")
scala> println(events.queryExecution.logical.numberedTreeString)
00 'EventTimeWatermark 'timestamp, interval 20 seconds
01 +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@75abcdd4,csv,List(),Some(StructType(StructField(timestamp,TimestampType,true), StructField(count,LongType,true))),List(),None,Map(path -> events),None), FileSource[events], [timestamp#329, count#330L]

delayThreshold is parsed using CalendarInterval.fromString with interval formatted as described in TimeWindow unary expression.

0 years 0 months 1 week 0 days 0 hours 1 minute 20 seconds 0 milliseconds 0 microseconds
delayThreshold must not be negative (and milliseconds and months should both be equal or greater than 0).
withWatermark is used when…​FIXME

results matching ""

    No results matching ""