Dataset API — Basic Actions

Basic actions are a group of operators (methods) of the Dataset API for transforming a Dataset into a session-scoped or global temporary view and other basic actions (FIXME).

Note
Basic actions are the methods in the Dataset Scala class that are grouped in basic group name, i.e. @group basic.
Table 1. Dataset API’s Basic Actions
Action Description

cache

cache(): this.type

Marks the Dataset to be persisted (cached) and is actually a synonym of persist basic action

checkpoint

checkpoint(): Dataset[T]
checkpoint(eager: Boolean): Dataset[T]

Checkpoints the Dataset in a reliable way (using a reliable HDFS-compliant file system, e.g. Hadoop HDFS or Amazon S3)

columns

columns: Array[String]

createGlobalTempView

createGlobalTempView(viewName: String): Unit

createOrReplaceGlobalTempView

createOrReplaceGlobalTempView(viewName: String): Unit

createOrReplaceTempView

createOrReplaceTempView(viewName: String): Unit

createTempView

createTempView(viewName: String): Unit

dtypes

dtypes: Array[(String, String)]

explain

explain(): Unit
explain(extended: Boolean): Unit

Displays the logical and physical plans of the Dataset, i.e. displays the logical and physical plans (with optional cost and codegen summaries) to the standard output

hint

hint(name: String, parameters: Any*): Dataset[T]

inputFiles

inputFiles: Array[String]

isEmpty

isEmpty: Boolean

(New in 2.4.0)

isLocal

isLocal: Boolean

localCheckpoint

localCheckpoint(): Dataset[T]
localCheckpoint(eager: Boolean): Dataset[T]

Checkpoints the Dataset locally on executors (and therefore unreliably)

persist

persist(): this.type (1)
persist(newLevel: StorageLevel): this.type
  1. Assumes the default storage level MEMORY_AND_DISK

Marks the Dataset to be persisted the next time an action is executed

Internally, persist simply request the CacheManager to cache the structured query.

Note
persist uses the CacheManager from the SharedState associated with the SparkSession (of the Dataset).

printSchema

printSchema(): Unit

rdd

rdd: RDD[T]

schema

schema: StructType

storageLevel

storageLevel: StorageLevel

toDF

toDF(): DataFrame
toDF(colNames: String*): DataFrame

unpersist

unpersist(): this.type
unpersist(blocking: Boolean): this.type

Unpersists the Dataset

write

write: DataFrameWriter[T]

Returns a DataFrameWriter for saving the content of the (non-streaming) Dataset out to an external storage

Reliably Checkpointing Dataset — checkpoint Basic Action

checkpoint(): Dataset[T]  (1)
checkpoint(eager: Boolean): Dataset[T]  (2)
  1. eager and reliableCheckpoint flags enabled

  2. reliableCheckpoint flag enabled

Note
checkpoint is an experimental operator and the API is evolving towards becoming stable.

checkpoint simply requests the Dataset to checkpoint with the given eager flag and the reliableCheckpoint flag enabled.

createTempView Basic Action

createTempView(viewName: String): Unit

createTempView…​FIXME

Note
createTempView is used when…​FIXME

createOrReplaceTempView Basic Action

createOrReplaceTempView(viewName: String): Unit

createOrReplaceTempView…​FIXME

Note
createOrReplaceTempView is used when…​FIXME

createGlobalTempView Basic Action

createGlobalTempView(viewName: String): Unit

createGlobalTempView…​FIXME

Note
createGlobalTempView is used when…​FIXME

createOrReplaceGlobalTempView Basic Action

createOrReplaceGlobalTempView(viewName: String): Unit

createOrReplaceGlobalTempView…​FIXME

Note
createOrReplaceGlobalTempView is used when…​FIXME

createTempViewCommand Internal Method

createTempViewCommand(
  viewName: String,
  replace: Boolean,
  global: Boolean): CreateViewCommand

createTempViewCommand…​FIXME

Note
createTempViewCommand is used when the following Dataset operators are used: Dataset.createTempView, Dataset.createOrReplaceTempView, Dataset.createGlobalTempView and Dataset.createOrReplaceGlobalTempView.

Displaying Logical and Physical Plans, Their Cost and Codegen — explain Basic Action

explain(): Unit (1)
explain(extended: Boolean): Unit
  1. Turns the extended flag on

explain prints the logical and (with extended flag enabled) physical plans, their cost and codegen to the console.

Tip
Use explain to review the structured queries and optimizations applied.

Internally, explain creates a ExplainCommand logical command and requests SessionState to execute it (to get a QueryExecution back).

Note
explain uses ExplainCommand logical command that, when executed, gives different text representations of QueryExecution (for the Dataset’s LogicalPlan) depending on the flags (e.g. extended, codegen, and cost which are disabled by default).

explain then requests QueryExecution for the optimized physical query plan and collects the records (as InternalRow objects).

Note

explain uses Dataset’s SparkSession to access the current SessionState.

In the end, explain goes over the InternalRow records and converts them to lines to display to console.

Note
explain "converts" an InternalRow record to a line using getString at position 0.
Tip
If you are serious about query debugging you could also use the Debugging Query Execution facility.
scala> spark.range(10).explain(extended = true)
== Parsed Logical Plan ==
Range (0, 10, step=1, splits=Some(8))

== Analyzed Logical Plan ==
id: bigint
Range (0, 10, step=1, splits=Some(8))

== Optimized Logical Plan ==
Range (0, 10, step=1, splits=Some(8))

== Physical Plan ==
*Range (0, 10, step=1, splits=Some(8))

Specifying Hint — hint Basic Action

hint(name: String, parameters: Any*): Dataset[T]

hint operator is part of Hint Framework to specify a hint (by name and parameters) for a Dataset.

Internally, hint simply attaches UnresolvedHint unary logical operator to an "analyzed" Dataset (i.e. the analyzed logical plan of a Dataset).

val ds = spark.range(3)
val plan = ds.queryExecution.logical
scala> println(plan.numberedTreeString)
00 Range (0, 3, step=1, splits=Some(8))

// Attach a hint
val dsHinted = ds.hint("myHint", 100, true)
val plan = dsHinted.queryExecution.logical
scala> println(plan.numberedTreeString)
00 'UnresolvedHint myHint, [100, true]
01 +- Range (0, 3, step=1, splits=Some(8))
Note
hint adds an UnresolvedHint unary logical operator to an analyzed logical plan that indirectly triggers analysis phase that executes logical commands and their unions as well as resolves all hints that have already been added to a logical plan.
// FIXME Demo with UnresolvedHint

Locally Checkpointing Dataset — localCheckpoint Basic Action

localCheckpoint(): Dataset[T] (1)
localCheckpoint(eager: Boolean): Dataset[T]
  1. eager flag enabled

localCheckpoint simply uses Dataset.checkpoint operator with the input eager flag and reliableCheckpoint flag disabled (false).

checkpoint Internal Method

checkpoint(eager: Boolean, reliableCheckpoint: Boolean): Dataset[T]

checkpoint requests QueryExecution (of the Dataset) to generate an RDD of internal binary rows (aka internalRdd) and then requests the RDD to make a copy of all the rows (by adding a MapPartitionsRDD).

Depending on reliableCheckpoint flag, checkpoint marks the RDD for (reliable) checkpointing (true) or local checkpointing (false).

With eager flag on, checkpoint counts the number of records in the RDD (by executing RDD.count) that gives the effect of immediate eager checkpointing.

checkpoint requests QueryExecution (of the Dataset) for optimized physical query plan (the plan is used to get the outputPartitioning and outputOrdering for the result Dataset).

Note
checkpoint is used in the Dataset untyped transformations, i.e. checkpoint and localCheckpoint.

Generating RDD of Internal Binary Rows — rdd Basic Action

rdd: RDD[T]

Whenever you are in need to convert a Dataset into a RDD, executing rdd method gives you the RDD of the proper input object type (not Row as in DataFrames) that sits behind the Dataset.

scala> val rdd = tokens.rdd
rdd: org.apache.spark.rdd.RDD[Token] = MapPartitionsRDD[11] at rdd at <console>:30

Internally, it looks ExpressionEncoder (for the Dataset) up and accesses the deserializer expression. That gives the DataType of the result of evaluating the expression.

Note
A deserializer expression is used to decode an InternalRow to an object of type T. See ExpressionEncoder.

It then executes a DeserializeToObject logical operator that will produce a RDD[InternalRow] that is converted into the proper RDD[T] using the DataType and T.

Note
It is a lazy operation that "produces" a RDD[T].

Accessing Schema — schema Basic Action

A Dataset has a schema.

schema: StructType
Tip

You may also use the following methods to learn about the schema:

Converting Typed Dataset to Untyped DataFrame — toDF Basic Action

toDF(): DataFrame
toDF(colNames: String*): DataFrame

toDF converts a Dataset into a DataFrame.

Internally, the empty-argument toDF creates a Dataset[Row] using the Dataset's SparkSession and QueryExecution with the encoder being RowEncoder.

Caution
FIXME Describe toDF(colNames: String*)

Unpersisting Cached Dataset — unpersist Basic Action

unpersist(): this.type
unpersist(blocking: Boolean): this.type

unpersist uncache the Dataset possibly by blocking the call.

Internally, unpersist requests CacheManager to uncache the query.

Caution
FIXME

Accessing DataFrameWriter (to Describe Writing Dataset) — write Basic Action

write: DataFrameWriter[T]

write gives DataFrameWriter for records of type T.

import org.apache.spark.sql.{DataFrameWriter, Dataset}
val ints: Dataset[Int] = (0 to 5).toDS
val writer: DataFrameWriter[Int] = ints.write

isEmpty Typed Transformation

isEmpty: Boolean

isEmpty…​FIXME

isLocal Typed Transformation

isLocal: Boolean

isLocal…​FIXME

results matching ""

    No results matching ""