DataFrameWriter

DataFrameWriter is an interface to persist a Dataset to an external storage system in a batch fashion.

Tip
Read DataStreamWriter for streamed writing.

You use write method on a Dataset to access DataFrameWriter.

import org.apache.spark.sql.DataFrameWriter

val nums: Dataset[Long] = ...
val writer: DataFrameWriter[Row] = nums.write

DataFrameWriter has a direct support for many file formats, JDBC databases and an interface to plug in new formats. It assumes parquet as the default data source that you can change using spark.sql.sources.default setting or format method.

// see above for writer definition

// Save dataset in Parquet format
writer.save(path = "nums")

// Save dataset in JSON format
writer.format("json").save(path = "nums-json")

In the end, you trigger the actual saving of the content of a Dataset using save method.

writer.save
Note
Interestingly, a DataFrameWriter is really a type constructor in Scala. It keeps a reference to a source DataFrame during its lifecycle (starting right from the moment it was created).

Internal State

DataFrameWriter uses the following mutable attributes to build a properly-defined write specification for insertInto, saveAsTable, and save:

Table 1. Attributes and Corresponding Setters
Attribute Setters

source

format

mode

mode

extraOptions

option, options, save

partitioningColumns

partitionBy

bucketColumnNames

bucketBy

numBuckets

bucketBy

sortColumnNames

sortBy

saveAsTable Method

saveAsTable(tableName: String): Unit

saveAsTable saves the content of a DataFrame as the tableName table.

First, tableName is parsed to an internal table identifier. saveAsTable then checks whether the table exists or not and uses save mode to decide what to do.

saveAsTable uses the SessionCatalog for the current session.

Table 2. saveAsTable's Behaviour per Save Mode
Does table exist? Save Mode Behaviour

yes

Ignore

Do nothing

yes

ErrorIfExists

Throws a AnalysisException exception with Table [tableIdent] already exists. error message.

anything

anything

It creates a CatalogTable and executes the CreateTable plan.

val ints = 0 to 9 toDF
val options = Map("path" -> "/tmp/ints")
ints.write.options(options).saveAsTable("ints")
sql("show tables").show

Persisting DataFrame — save Method

save(): Unit

save saves the content of a Dataset to…​FIXME

Internally, save first checks whether the DataFrame is not bucketed.

Caution
FIXME What does bucketing mean?

save then creates a DataSource (for the source) and calls write on it.

Note
save uses source, partitioningColumns, extraOptions, and mode internal attributes directly. They are specified through the API.

jdbc Method

jdbc(url: String, table: String, connectionProperties: Properties): Unit

jdbc method saves the content of the DataFrame to an external database table via JDBC.

You can use mode to control save mode, i.e. what happens when an external table exists when save is executed.

It is assumed that the jdbc save pipeline is not partitioned and bucketed.

All options are overriden by the input connectionProperties.

The required options are:

  • driver which is the class name of the JDBC driver (that is passed to Spark’s own DriverRegistry.register and later used to connect(url, properties)).

When table exists and the override save mode is in use, DROP TABLE table is executed.

It creates the input table (using CREATE TABLE table (schema) where schema is the schema of the DataFrame).

bucketBy Method

Caution
FIXME

partitionBy Method

partitionBy(colNames: String*): DataFrameWriter[T]
Caution
FIXME

Defining Write Behaviour Per Sink’s Existence (aka Save Mode) — mode Method

mode(saveMode: String): DataFrameWriter[T]
mode(saveMode: SaveMode): DataFrameWriter[T]

mode defines the behaviour of save when an external file or table (Spark writes to) already exists, i.e. SaveMode.

Table 3. Types of SaveMode (in alphabetical order)
Name Description

Append

Records are appended to existing data.

ErrorIfExists

Exception is thrown.

Ignore

Do not save the records and not change the existing data in any way.

Overwrite

Existing data is overwritten by new records.

Writer Configuration — option and options Methods

Caution
FIXME

Writing DataFrames to Files

Caution
FIXME

Specifying Alias or Fully-Qualified Class Name of DataSource — format Method

Caution
FIXME Compare to DataFrameReader.

Parquet

Caution
FIXME
Note
Parquet is the default data source format.

insertInto Method

Caution
FIXME

results matching ""

    No results matching ""