Dataset — Strongly-Typed Structured Query with Encoder

Dataset is a strongly-typed data structure in Spark SQL that represents a structured query with encoders.

spark sql Dataset.png
Figure 1. Dataset’s Internals
Given the picture above, one could say that a Dataset is a pair of an Encoder and QueryExecution (that in turn is a LogicalPlan in a SparkSession)

Datasets are lazy and structured query expressions are only triggered when an action is invoked. Internally, a Dataset represents a logical plan that describes the computation query required to produce the data (in a given session).

A Dataset is a result of executing a query expression against data storage like files, Hive tables or JDBC databases. The structured query expression can be described by a SQL query, a Column-based SQL expression or a Scala/Java lambda function. And that is why Dataset operations are available in three variants.

import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...

scala> val dataset = spark.range(5)
dataset: org.apache.spark.sql.Dataset[Long] = [id: bigint]

// Variant 1: filter operator accepts a Scala function
dataset.filter(n => n % 2 == 0).count

// Variant 2: filter operator accepts a Column-based SQL expression
dataset.filter('value % 2 === 0).count

// Variant 3: filter operator accepts a SQL query
dataset.filter("value % 2 = 0").count

The Dataset API offers declarative and type-safe operators that makes for an improved experience for data processing (comparing to DataFrames that were a set of index- or column name-based Rows).


Dataset was first introduced in Apache Spark 1.6.0 as an experimental feature, and has since turned itself into a fully supported API.

As of Spark 2.0.0, DataFrame - the flagship data abstraction of previous versions of Spark SQL - is currently a mere type alias for Dataset[Row]:

type DataFrame = Dataset[Row]

Dataset offers convenience of RDDs with the performance optimizations of DataFrames and the strong static type-safety of Scala. The last feature of bringing the strong type-safety to DataFrame makes Dataset so appealing. All the features together give you a more functional programming interface to work with structured data.

scala> spark.range(1).filter('id === 0).explain(true)
== Parsed Logical Plan ==
'Filter ('id = 0)
+- Range (0, 1, splits=8)

== Analyzed Logical Plan ==
id: bigint
Filter (id#51L = cast(0 as bigint))
+- Range (0, 1, splits=8)

== Optimized Logical Plan ==
Filter (id#51L = 0)
+- Range (0, 1, splits=8)

== Physical Plan ==
*Filter (id#51L = 0)
+- *Range (0, 1, splits=8)

scala> spark.range(1).filter(_ == 0).explain(true)
== Parsed Logical Plan ==
'TypedFilter <function1>, class java.lang.Long, [StructField(value,LongType,true)], unresolveddeserializer(newInstance(class java.lang.Long))
+- Range (0, 1, splits=8)

== Analyzed Logical Plan ==
id: bigint
TypedFilter <function1>, class java.lang.Long, [StructField(value,LongType,true)], newInstance(class java.lang.Long)
+- Range (0, 1, splits=8)

== Optimized Logical Plan ==
TypedFilter <function1>, class java.lang.Long, [StructField(value,LongType,true)], newInstance(class java.lang.Long)
+- Range (0, 1, splits=8)

== Physical Plan ==
*Filter <function1>.apply
+- *Range (0, 1, splits=8)

It is only with Datasets to have syntax and analysis checks at compile time (that was not possible using DataFrame, regular SQL queries or even RDDs).

Using Dataset objects turns DataFrames of Row instances into a DataFrames of case classes with proper names and types (following their equivalents in the case classes). Instead of using indices to access respective fields in a DataFrame and cast it to a type, all this is automatically handled by Datasets and checked by the Scala compiler.

Datasets use the Catalyst Query Optimizer and Tungsten to optimize query performance.

A Dataset object requires a SparkSession, a QueryExecution plan, and an Encoder (for fast serialization to and deserialization from InternalRow).

If however a LogicalPlan is used to create a Dataset, the logical plan is first executed (using the current SessionState in the SparkSession) that yields the QueryExecution plan.

A Dataset is Queryable and Serializable, i.e. can be saved to a persistent storage.

SparkSession and QueryExecution are transient attributes of a Dataset and therefore do not participate in Dataset serialization. The only firmly-tied feature of a Dataset is the Encoder.

You can convert a type-safe Dataset to a "untyped" DataFrame or access the RDD that is generated after executing the query. It is supposed to give you a more pleasant experience while transitioning from the legacy RDD-based or DataFrame-based APIs you may have used in the earlier versions of Spark SQL or encourage migrating from Spark Core’s RDD API to Spark SQL’s Dataset API.

The default storage level for Datasets is MEMORY_AND_DISK because recomputing the in-memory columnar representation of the underlying table is expensive. You can however persist a Dataset.

Spark 2.0 has introduced a new query model called Structured Streaming for continuous incremental execution of structured queries. That made possible to consider Datasets a static and bounded as well as streaming and unbounded data sets with a single unified API for different execution models.

A Dataset is local if it was created from local collections using SparkSession.emptyDataset or SparkSession.createDataset methods and their derivatives like toDF. If so, the queries on the Dataset can be optimized and run locally, i.e. without using Spark executors.

Dataset makes sure that the underlying QueryExecution is analyzed and checked.
Table 1. Dataset’s Properties
Name Description



Used when…​FIXME


Deserializer expression to convert internal rows to objects of type T

Created lazily by requesting the ExpressionEncoder to resolveAndBind

Used when:


Implicit ExpressionEncoder

Used when…​FIXME


Logical plan


(lazily-created) RDD of JVM objects of type T (as converted from rows in Dataset in the internal binary row format).

rdd: RDD[T]
rdd gives RDD with the extra execution step to convert rows from their internal binary row format to JVM objects that will impact the JVM memory as the objects are inside JVM (while were outside before). You should not use rdd directly.

Internally, rdd first creates a new logical plan that deserializes the Dataset’s logical plan.

val dataset = spark.range(5).withColumn("group", 'id % 2)
scala> dataset.rdd.toDebugString
res1: String =
(8) MapPartitionsRDD[8] at rdd at <console>:26 [] // <-- extra deserialization step
 |  MapPartitionsRDD[7] at rdd at <console>:26 []
 |  MapPartitionsRDD[6] at rdd at <console>:26 []
 |  MapPartitionsRDD[5] at rdd at <console>:26 []
 |  ParallelCollectionRDD[4] at rdd at <console>:26 []

scala> dataset.queryExecution.toRdd.toDebugString
res2: String =
(8) MapPartitionsRDD[11] at toRdd at <console>:26 []
|  MapPartitionsRDD[10] at toRdd at <console>:26 []
|  ParallelCollectionRDD[9] at toRdd at <console>:26 []

rdd then requests SessionState to execute the logical plan to get the corresponding RDD of binary rows.

rdd uses SparkSession to access SessionState.

rdd then requests the Dataset’s ExpressionEncoder for the data type of the rows (using deserializer expression) and maps over them (per partition) to create records of the expected type T.

rdd is at the "boundary" between the internal binary row format and the JVM type of the dataset. Avoid the extra deserialization step to lower JVM memory requirements of your Spark application.


Lazily-created SQLContext

Used when…​FIXME

Getting Input Files of Relations (in Structured Query) — inputFiles Method

inputFiles: Array[String]

inputFiles requests QueryExecution for optimized logical plan and collects the following logical operators:

inputFiles then requests the logical operators for their underlying files:

resolve Internal Method

resolve(colName: String): NamedExpression

Creating Dataset Instance

Dataset takes the following when created:

You can also create a Dataset using LogicalPlan that is immediately executed using SessionState.

Internally, Dataset requests QueryExecution to analyze itself.

Dataset initializes the internal registries and counters.

Is Dataset Local? — isLocal Method

isLocal: Boolean

isLocal flag is enabled (i.e. true) when operators like collect or take could be run locally, i.e. without using executors.

Internally, isLocal checks whether the logical query plan of a Dataset is LocalRelation.

Is Dataset Streaming? — isStreaming method

isStreaming: Boolean

isStreaming is enabled (i.e. true) when the logical plan is streaming.

Internally, isStreaming takes the Dataset’s logical plan and gives whether the plan is streaming or not.

Implicit Type Conversions to Datasets — toDS and toDF methods

DatasetHolder case class offers three methods that do the conversions from Seq[T] or RDD[T] types to a Dataset[T]:

  • toDS(): Dataset[T]

  • toDF(): DataFrame

  • toDF(colNames: String*): DataFrame

DataFrame is a mere type alias for Dataset[Row] since Spark 2.0.0.

DatasetHolder is used by SQLImplicits that is available to use after importing implicits object of SparkSession.

val spark: SparkSession = ...
import spark.implicits._

scala> val ds = Seq("I am a shiny Dataset!").toDS
ds: org.apache.spark.sql.Dataset[String] = [value: string]

scala> val df = Seq("I am an old grumpy DataFrame!").toDF
df: org.apache.spark.sql.DataFrame = [value: string]

scala> val df = Seq("I am an old grumpy DataFrame!").toDF("text")
df: org.apache.spark.sql.DataFrame = [text: string]

scala> val ds = sc.parallelize(Seq("hello")).toDS
ds: org.apache.spark.sql.Dataset[String] = [value: string]

This import of implicits object’s values is automatically executed in Spark Shell and so you don’t need to do anything but use the conversions.

scala> spark.version
res11: String = 2.0.0

scala> :imports
 1) import spark.implicits._  (59 terms, 38 are implicit)
 2) import spark.sql          (1 terms)
val spark: SparkSession = ...
import spark.implicits._

case class Token(name: String, productId: Int, score: Double)
val data = Seq(
  Token("aaa", 100, 0.12),
  Token("aaa", 200, 0.29),
  Token("bbb", 200, 0.53),
  Token("bbb", 300, 0.42))

// Transform data to a Dataset[Token]
// It doesn't work with type annotation
val ds = data.toDS

// ds: org.apache.spark.sql.Dataset[Token] = [name: string, productId: int ... 1 more field]

// Transform data into a DataFrame with no explicit schema
val df = data.toDF

// Transform DataFrame into a Dataset
val ds =[Token]

| aaa|      100| 0.12|
| aaa|      200| 0.29|
| bbb|      200| 0.53|
| bbb|      300| 0.42|

scala> ds.printSchema
 |-- name: string (nullable = true)
 |-- productId: integer (nullable = false)
 |-- score: double (nullable = false)

// In DataFrames we work with Row instances
|value                                                         |

// In Datasets we work with case class instances
|value                      |

Internals of toDS

Internally, the Scala compiler makes toDS implicitly available to any Seq[T] (using SQLImplicits.localSeqToDatasetHolder implicit method).

This and other implicit methods are in scope whenever you do import spark.implicits._.

The input Seq[T] is converted into Dataset[T] by means of SQLContext.createDataset that in turn passes all calls on to SparkSession.createDataset. Once created, the Dataset[T] is wrapped in DatasetHolder[T] with toDS that just returns the input ds.



withNewRDDExecutionId Internal Method

withNewRDDExecutionId[U](body: => U): U

withNewRDDExecutionId executes the input body action under new execution id.

FIXME What’s the difference between withNewRDDExecutionId and withNewExecutionId?
withNewRDDExecutionId is used when Dataset executes foreach and foreachPartition actions.

Creating DataFrame — ofRows Internal Method

ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame
ofRows is a private[sql] operator that can only be accessed from code in org.apache.spark.sql package. It is not part of Dataset public API.

ofRows returns DataFrame (which is the type alias for Dataset[Row]). ofRows uses RowEncoder to convert the schema (based on the input logicalPlan logical plan).

Internally, ofRows prepares the input logicalPlan for execution and creates a Dataset[Row] with the current SparkSession, the QueryExecution and RowEncoder.

Tracking Multi-Job Structured Query Execution (PySpark) — withNewExecutionId Internal Method

withNewExecutionId[U](body: => U): U

withNewExecutionId executes the input body action under new execution id.

withNewExecutionId sets a unique execution id so that all Spark jobs belong to the Dataset action execution.

withNewExecutionId is used exclusively when Dataset is executing Python-based actions (i.e. collectToPython, collectAsArrowToPython and toPythonIterator) that are not of much interest in this gitbook.

Feel free to contact me at if you think I should re-consider my decision.

Tracking Multi-Job Action Execution Under New Execution Id — withAction Internal Method

withAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U)

withAction requests QueryExecution for the optimized physical query plan and resets the metrics of every physical operator (in the physical plan).

withAction requests SQLExecution for executing the input action with the executable physical plan (tracked under a new execution id).

In the end, withAction notifies ExecutionListenerManager that the name action has finished successfully or with an exception.

withAction uses SparkSession to access ExecutionListenerManager.

withAction is used when Dataset is requested for the following:

Creating Dataset For SparkSession and LogicalPlan — apply Method

apply[T: Encoder](sparkSession: SparkSession, logicalPlan: LogicalPlan): Dataset[T]


apply is used when…​FIXME

Collecting All Rows From Spark Plan — collectFromPlan Internal Method

collectFromPlan(plan: SparkPlan): Array[T]


collectFromPlan is used for Dataset.head, Dataset.collect and Dataset.collectAsList operators.

selectUntyped Internal Method

selectUntyped(columns: TypedColumn[_, _]*): Dataset[_]


selectUntyped is used exclusively when typed operator is used.

Further Reading and Watching

results matching ""

    No results matching ""