DataFrameWriter — Saving Data To External Data Sources

DataFrameWriter is the interface to describe how data (as the result of executing a structured query) should be saved to an external data source.

Table 1. DataFrameWriter API / Writing Operators
Method Description

bucketBy

bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]

csv

csv(path: String): Unit

format

format(source: String): DataFrameWriter[T]

insertInto

insertInto(tableName: String): Unit

Inserts (the results of) a structured query (DataFrame) into a table

jdbc

jdbc(url: String, table: String, connectionProperties: Properties): Unit

json

json(path: String): Unit

mode

mode(saveMode: SaveMode): DataFrameWriter[T]
mode(saveMode: String): DataFrameWriter[T]

option

option(key: String, value: String): DataFrameWriter[T]
option(key: String, value: Boolean): DataFrameWriter[T]
option(key: String, value: Long): DataFrameWriter[T]
option(key: String, value: Double): DataFrameWriter[T]

options

options(options: scala.collection.Map[String, String]): DataFrameWriter[T]

orc

orc(path: String): Unit

parquet

parquet(path: String): Unit

partitionBy

partitionBy(colNames: String*): DataFrameWriter[T]

save

save(): Unit
save(path: String): Unit

Saves a DataFrame (i.e. writes the result of executing a structured query) to the data source

saveAsTable

saveAsTable(tableName: String): Unit

sortBy

sortBy(colName: String, colNames: String*): DataFrameWriter[T]

text

text(path: String): Unit

DataFrameWriter is available using Dataset.write operator.

scala> :type df
org.apache.spark.sql.DataFrame

val writer = df.write

scala> :type writer
org.apache.spark.sql.DataFrameWriter[org.apache.spark.sql.Row]

DataFrameWriter supports many file formats and JDBC databases. It also allows for plugging in new formats.

DataFrameWriter defaults to parquet data source format. You can change the default format using spark.sql.sources.default configuration property or format or the format-specific methods.

// see above for writer definition

// Save dataset in Parquet format
writer.save(path = "nums")

// Save dataset in JSON format
writer.format("json").save(path = "nums-json")

// Alternatively, use format-specific method
write.json(path = "nums-json")

In the end, you trigger the actual saving of the content of a Dataset (i.e. the result of executing a structured query) using save method.

writer.save

DataFrameWriter uses internal mutable attributes to build a properly-defined "write specification" for insertInto, save and saveAsTable methods.

Table 2. Internal Attributes and Corresponding Setters
Attribute Setters

source

format

mode

mode

extraOptions

option, options, save

partitioningColumns

partitionBy

bucketColumnNames

bucketBy

numBuckets

bucketBy

sortColumnNames

sortBy

Note
DataFrameWriter is a type constructor in Scala that keeps an internal reference to the source DataFrame for the whole lifecycle (starting right from the moment it was created).
Note
Spark Structured Streaming’s DataStreamWriter is responsible for writing the content of streaming Datasets in a streaming fashion.

Executing Logical Command(s) — runCommand Internal Method

runCommand(session: SparkSession, name: String)(command: LogicalPlan): Unit

runCommand uses the input SparkSession to access the SessionState that is in turn requested to execute the logical command (that simply creates a QueryExecution).

runCommand records the current time (start time) and uses the SQLExecution helper object to execute the action (under a new execution id) that simply requests the QueryExecution for the RDD[InternalRow] (and triggers execution of logical commands).

Tip
Use web UI’s SQL tab to see the execution or a SparkListener to be notified when the execution is started and finished. The SparkListener should intercept SparkListenerSQLExecutionStart and SparkListenerSQLExecutionEnd events.

runCommand records the current time (end time).

In the end, runCommand uses the input SparkSession to access the ExecutionListenerManager and requests it to onSuccess (with the input name, the QueryExecution and the duration).

In case of any exceptions, runCommand requests the ExecutionListenerManager to onFailure (with the exception) and (re)throws it.

Saving Rows of Structured Streaming (DataFrame) to Table — saveAsTable Method

saveAsTable(tableName: String): Unit
// PRIVATE API
saveAsTable(tableIdent: TableIdentifier): Unit

saveAsTable saves the content of a DataFrame to the tableName table.

val ids = spark.range(5)
ids.write.
  option("path", "/tmp/five_ids").
  saveAsTable("five_ids")

// Check out if saveAsTable as five_ids was successful
val q = spark.catalog.listTables.filter($"name" === "five_ids")
scala> q.show
+--------+--------+-----------+---------+-----------+
|    name|database|description|tableType|isTemporary|
+--------+--------+-----------+---------+-----------+
|five_ids| default|       null| EXTERNAL|      false|
+--------+--------+-----------+---------+-----------+

Internally, saveAsTable requests the current ParserInterface to parse the input table name.

Note
saveAsTable uses the internal DataFrame to access the SparkSession that is used to access the SessionState and in the end the ParserInterface.

saveAsTable then requests the SessionCatalog to check whether the table exists or not.

Note
saveAsTable uses the internal DataFrame to access the SparkSession that is used to access the SessionState and in the end the SessionCatalog.

In the end, saveAsTable branches off per whether the table exists or not and the save mode.

Table 3. saveAsTable’s Behaviour per Save Mode
Does table exist? Save Mode Behaviour

yes

Ignore

Does nothing

yes

ErrorIfExists

Reports an AnalysisException with Table [tableIdent] already exists. error message

yes

Overwrite

FIXME

anything

anything

createTable

Saving Rows of Structured Query (DataFrame) to Data Source — save Method

save(): Unit

save saves the rows of a structured query (a Dataset) to a data source.

Internally, save uses DataSource to look up the class of the requested data source (for the source option and the SQLConf).

Note

save uses SparkSession to access the SessionState that is in turn used to access the SQLConf.

val df: DataFrame = ???
df.sparkSession.sessionState.conf

If the class is a DataSourceV2…​FIXME

Otherwise, if not a DataSourceV2, save simply saveToV1Source.

save does not support saving to Hive (i.e. the source is hive) and throws an AnalysisException when requested so.

Hive data source can only be used with tables, you can not write files of Hive data source directly.

save does not support bucketing (i.e. when the numBuckets or sortColumnNames options are defined) and throws an AnalysisException when requested so.

'[operation]' does not support bucketing right now

Saving Data to Table Using JDBC Data Source — jdbc Method

jdbc(url: String, table: String, connectionProperties: Properties): Unit

jdbc method saves the content of the DataFrame to an external database table via JDBC.

You can use mode to control save mode, i.e. what happens when an external table exists when save is executed.

It is assumed that the jdbc save pipeline is not partitioned and bucketed.

All options are overriden by the input connectionProperties.

The required options are:

  • driver which is the class name of the JDBC driver (that is passed to Spark’s own DriverRegistry.register and later used to connect(url, properties)).

When table exists and the override save mode is in use, DROP TABLE table is executed.

It creates the input table (using CREATE TABLE table (schema) where schema is the schema of the DataFrame).

bucketBy Method

bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]

bucketBy simply sets the internal numBuckets and bucketColumnNames to the input numBuckets and colName with colNames, respectively.

val df = spark.range(5)
import org.apache.spark.sql.DataFrameWriter
val writer: DataFrameWriter[java.lang.Long] = df.write

val bucketedTable = writer.bucketBy(numBuckets = 8, "col1", "col2")

scala> :type bucketedTable
org.apache.spark.sql.DataFrameWriter[Long]

partitionBy Method

partitionBy(colNames: String*): DataFrameWriter[T]
Caution
FIXME

Specifying Save Mode — mode Method

mode(saveMode: String): DataFrameWriter[T]
mode(saveMode: SaveMode): DataFrameWriter[T]

mode defines the behaviour of save when an external file or table (Spark writes to) already exists, i.e. SaveMode.

Table 4. Types of SaveMode
Name Description

Append

Records are appended to existing data.

ErrorIfExists

Exception is thrown.

Ignore

Do not save the records and not change the existing data in any way.

Overwrite

Existing data is overwritten by new records.

Specifying Sorting Columns — sortBy Method

sortBy(colName: String, colNames: String*): DataFrameWriter[T]

sortBy simply sets sorting columns to the input colName and colNames column names.

Note
sortBy must be used together with bucketBy or DataFrameWriter reports an IllegalArgumentException.
Note
assertNotBucketed asserts that bucketing is not used by some methods.

Specifying Writer Configuration — option Method

option(key: String, value: Boolean): DataFrameWriter[T]
option(key: String, value: Double): DataFrameWriter[T]
option(key: String, value: Long): DataFrameWriter[T]
option(key: String, value: String): DataFrameWriter[T]

option…​FIXME

Specifying Writer Configuration — options Method

options(options: scala.collection.Map[String, String]): DataFrameWriter[T]

options…​FIXME

Writing DataFrames to Files

Caution
FIXME

Specifying Data Source (by Alias or Fully-Qualified Class Name) — format Method

format(source: String): DataFrameWriter[T]

format simply sets the source internal property.

Parquet

Caution
FIXME
Note
Parquet is the default data source format.

Inserting Rows of Structured Streaming (DataFrame) into Table — insertInto Method

insertInto(tableName: String): Unit (1)
insertInto(tableIdent: TableIdentifier): Unit
  1. Parses tableName and calls the other insertInto with a TableIdentifier

insertInto inserts the content of the DataFrame to the specified tableName table.

Note
insertInto ignores column names and just uses a position-based resolution, i.e. the order (not the names!) of the columns in (the output of) the Dataset matters.

Internally, insertInto creates an InsertIntoTable logical operator (with UnresolvedRelation operator as the only child) and executes it right away (that submits a Spark job).

spark sql DataFrameWrite insertInto webui query details.png
Figure 1. DataFrameWrite.insertInto Executes SQL Command (as a Spark job)

insertInto reports a AnalysisException for bucketed DataFrames, i.e. buckets or sortColumnNames are defined.

'insertInto' does not support bucketing right now
val writeSpec = spark.range(4).
  write.
  bucketBy(numBuckets = 3, colName = "id")
scala> writeSpec.insertInto("t1")
org.apache.spark.sql.AnalysisException: 'insertInto' does not support bucketing right now;
  at org.apache.spark.sql.DataFrameWriter.assertNotBucketed(DataFrameWriter.scala:334)
  at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:302)
  at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:298)
  ... 49 elided

insertInto reports a AnalysisException for partitioned DataFrames, i.e. partitioningColumns is defined.

insertInto() can't be used together with partitionBy(). Partition columns have already been defined for the table. It is not necessary to use partitionBy().
val writeSpec = spark.range(4).
  write.
  partitionBy("id")
scala> writeSpec.insertInto("t1")
org.apache.spark.sql.AnalysisException: insertInto() can't be used together with partitionBy(). Partition columns have already be defined for the table. It is not necessary to use partitionBy().;
  at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:305)
  at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:298)
  ... 49 elided

getBucketSpec Internal Method

getBucketSpec: Option[BucketSpec]

getBucketSpec returns a new BucketSpec if numBuckets was defined (with bucketColumnNames and sortColumnNames).

getBucketSpec throws an IllegalArgumentException when numBuckets are not defined when sortColumnNames are.

sortBy must be used together with bucketBy
Note
getBucketSpec is used exclusively when DataFrameWriter is requested to create a table.

Creating Table — createTable Internal Method

createTable(tableIdent: TableIdentifier): Unit

createTable assumes CatalogTableType.EXTERNAL when location URI of CatalogStorageFormat is defined and CatalogTableType.MANAGED otherwise.

createTable creates a CatalogTable (with the bucketSpec per getBucketSpec).

In the end, createTable creates a CreateTable logical command (with the CatalogTable, mode and the logical query plan of the dataset) and runs it.

Note
createTable is used when DataFrameWriter is requested to saveAsTable.

assertNotBucketed Internal Method

assertNotBucketed(operation: String): Unit

assertNotBucketed simply throws an AnalysisException if either numBuckets or sortColumnNames internal property is defined:

'[operation]' does not support bucketing right now
Note
assertNotBucketed is used when DataFrameWriter is requested to save, insertInto and jdbc.

Executing Logical Command for Writing to Data Source V1 — saveToV1Source Internal Method

saveToV1Source(): Unit

saveToV1Source creates a DataSource (for the source class name, the partitioningColumns and the extraOptions) and requests it for the logical command for writing (with the mode and the analyzed logical plan of the structured query).

Note
While requesting the analyzed logical plan of the structured query, saveToV1Source triggers execution of logical commands.

In the end, saveToV1Source runs the logical command for writing.

Note
saveToV1Source is used exclusively when DataFrameWriter is requested to save the rows of a structured query (a DataFrame) to a data source (for all but DataSourceV2 writers with WriteSupport).

assertNotPartitioned Internal Method

assertNotPartitioned(operation: String): Unit

assertNotPartitioned…​FIXME

Note
assertNotPartitioned is used when…​FIXME

csv Method

csv(path: String): Unit

csv…​FIXME

json Method

json(path: String): Unit

json…​FIXME

orc Method

orc(path: String): Unit

orc…​FIXME

parquet Method

parquet(path: String): Unit

parquet…​FIXME

text Method

text(path: String): Unit

text…​FIXME

partitionBy Method

partitionBy(colNames: String*): DataFrameWriter[T]

partitionBy simply sets the partitioningColumns internal property.

results matching ""

    No results matching ""