DataFrameReader — Loading Data From External Data Sources

DataFrameReader is the public interface to describe how to load data from an external data source (e.g. files, tables, JDBC or Dataset[String]).

Table 1. DataFrameReader API
Method Description

csv

csv(csvDataset: Dataset[String]): DataFrame
csv(path: String): DataFrame
csv(paths: String*): DataFrame

format

format(source: String): DataFrameReader

jdbc

jdbc(
  url: String,
  table: String,
  predicates: Array[String],
  connectionProperties: Properties): DataFrame
jdbc(
  url: String,
  table: String,
  properties: Properties): DataFrame
jdbc(
  url: String,
  table: String,
  columnName: String,
  lowerBound: Long,
  upperBound: Long,
  numPartitions: Int,
  connectionProperties: Properties): DataFrame

json

json(jsonDataset: Dataset[String]): DataFrame
json(path: String): DataFrame
json(paths: String*): DataFrame

load

load(): DataFrame
load(path: String): DataFrame
load(paths: String*): DataFrame

option

option(key: String, value: Boolean): DataFrameReader
option(key: String, value: Double): DataFrameReader
option(key: String, value: Long): DataFrameReader
option(key: String, value: String): DataFrameReader

options

options(options: scala.collection.Map[String, String]): DataFrameReader
options(options: java.util.Map[String, String]): DataFrameReader

orc

orc(path: String): DataFrame
orc(paths: String*): DataFrame

parquet

parquet(path: String): DataFrame
parquet(paths: String*): DataFrame

schema

schema(schemaString: String): DataFrameReader
schema(schema: StructType): DataFrameReader

table

table(tableName: String): DataFrame

text

text(path: String): DataFrame
text(paths: String*): DataFrame

textFile

textFile(path: String): Dataset[String]
textFile(paths: String*): Dataset[String]

DataFrameReader is available using SparkSession.read.

import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...

import org.apache.spark.sql.DataFrameReader
val reader: DataFrameReader = spark.read

DataFrameReader supports many file formats natively and offers the interface to define custom formats.

Note
DataFrameReader assumes parquet data source file format by default that you can change using spark.sql.sources.default Spark property.

After you have described the loading pipeline (i.e. the "Extract" part of ETL in Spark SQL), you eventually "trigger" the loading using format-agnostic load or format-specific (e.g. json, csv, jdbc) operators.

import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...

import org.apache.spark.sql.DataFrame

// Using format-agnostic load operator
val csvs: DataFrame = spark
  .read
  .format("csv")
  .option("header", true)
  .option("inferSchema", true)
  .load("*.csv")

// Using format-specific load operator
val jsons: DataFrame = spark
  .read
  .json("metrics/*.json")
Note
All methods of DataFrameReader merely describe a process of loading a data and do not trigger a Spark job (until an action is called).

DataFrameReader can read text files using textFile methods that return typed Datasets.

import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...

import org.apache.spark.sql.Dataset
val lines: Dataset[String] = spark
  .read
  .textFile("README.md")
Note
Loading datasets using textFile methods allows for additional preprocessing before final processing of the string values as json or csv lines.

(New in Spark 2.2) DataFrameReader can load datasets from Dataset[String] (with lines being complete "files") using format-specific csv and json operators.

val csvLine = "0,Warsaw,Poland"

import org.apache.spark.sql.Dataset
val cities: Dataset[String] = Seq(csvLine).toDS
scala> cities.show
+---------------+
|          value|
+---------------+
|0,Warsaw,Poland|
+---------------+

// Define schema explicitly (as below)
// or
// option("header", true) + option("inferSchema", true)
import org.apache.spark.sql.types.StructType
val schema = new StructType()
  .add($"id".long.copy(nullable = false))
  .add($"city".string)
  .add($"country".string)
scala> schema.printTreeString
root
 |-- id: long (nullable = false)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)

import org.apache.spark.sql.DataFrame
val citiesDF: DataFrame = spark
  .read
  .schema(schema)
  .csv(cities)
scala> citiesDF.show
+---+------+-------+
| id|  city|country|
+---+------+-------+
|  0|Warsaw| Poland|
+---+------+-------+
Table 2. DataFrameReader’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

extraOptions

Used when…​FIXME

source

Name of the input data source (aka format or provider) with the default format per spark.sql.sources.default configuration property (default: parquet).

source can be changed using format method.

Used exclusively when DataFrameReader is requested to load.

userSpecifiedSchema

Optional used-specified schema (default: None, i.e. undefined)

Set when DataFrameReader is requested to set a schema, load a data from an external data source, loadV1Source (when creating a DataSource), and load a data using json and csv file formats

Used when DataFrameReader is requested to assertNoSpecifiedSchema (while loading data using jdbc, table and textFile)

Specifying Format Of Input Data Source — format method

format(source: String): DataFrameReader

You use format to configure DataFrameReader to use appropriate source format.

Supported data formats:

  • json

  • csv (since 2.0.0)

  • parquet (see Parquet)

  • orc

  • text

  • jdbc

  • libsvm — only when used in format("libsvm")

Note
Spark SQL allows for developing custom data source formats.

Specifying Schema — schema method

schema(schema: StructType): DataFrameReader

schema allows for specyfing the schema of a data source (that the DataFrameReader is about to read a dataset from).

import org.apache.spark.sql.types.StructType
val schema = new StructType()
  .add($"id".long.copy(nullable = false))
  .add($"city".string)
  .add($"country".string)
scala> schema.printTreeString
root
 |-- id: long (nullable = false)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)

import org.apache.spark.sql.DataFrameReader
val r: DataFrameReader = spark.read.schema(schema)
Note
Some formats can infer schema from datasets (e.g. csv or json) using inferSchema option.
Tip
Read up on Schema.

Specifying Load Options — option and options Methods

option(key: String, value: String): DataFrameReader
option(key: String, value: Boolean): DataFrameReader
option(key: String, value: Long): DataFrameReader
option(key: String, value: Double): DataFrameReader

You can also use options method to describe different options in a single Map.

options(options: scala.collection.Map[String, String]): DataFrameReader

Loading Datasets from Files (into DataFrames) Using Format-Specific Load Operators

DataFrameReader supports the following file formats:

json method

json(path: String): DataFrame
json(paths: String*): DataFrame
json(jsonDataset: Dataset[String]): DataFrame
json(jsonRDD: RDD[String]): DataFrame

New in 2.0.0: prefersDecimal

csv method

csv(path: String): DataFrame
csv(paths: String*): DataFrame
csv(csvDataset: Dataset[String]): DataFrame

parquet method

parquet(path: String): DataFrame
parquet(paths: String*): DataFrame

The supported options:

New in 2.0.0: snappy is the default Parquet codec. See [SPARK-14482][SQL] Change default Parquet codec from gzip to snappy.

The compressions supported:

  • none or uncompressed

  • snappy - the default codec in Spark 2.0.0.

  • gzip - the default codec in Spark before 2.0.0

  • lzo

val tokens = Seq("hello", "henry", "and", "harry")
  .zipWithIndex
  .map(_.swap)
  .toDF("id", "token")

val parquetWriter = tokens.write
parquetWriter.option("compression", "none").save("hello-none")

// The exception is mostly for my learning purposes
// so I know where and how to find the trace to the compressions
// Sorry...
scala> parquetWriter.option("compression", "unsupported").save("hello-unsupported")
java.lang.IllegalArgumentException: Codec [unsupported] is not available. Available codecs are uncompressed, gzip, lzo, snappy, none.
  at org.apache.spark.sql.execution.datasources.parquet.ParquetOptions.<init>(ParquetOptions.scala:43)
  at org.apache.spark.sql.execution.datasources.parquet.DefaultSource.prepareWrite(ParquetRelation.scala:77)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$4.apply(InsertIntoHadoopFsRelation.scala:122)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$4.apply(InsertIntoHadoopFsRelation.scala:122)
  at org.apache.spark.sql.execution.datasources.BaseWriterContainer.driverSideSetup(WriterContainer.scala:103)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:141)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:116)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:116)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:116)
  at org.apache.spark.sql.execution.command.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:61)
  at org.apache.spark.sql.execution.command.ExecutedCommand.sideEffectResult(commands.scala:59)
  at org.apache.spark.sql.execution.command.ExecutedCommand.doExecute(commands.scala:73)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:137)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:134)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:117)
  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:65)
  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:65)
  at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:390)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:230)
  ... 48 elided

orc method

orc(path: String): DataFrame
orc(paths: String*): DataFrame

Optimized Row Columnar (ORC) file format is a highly efficient columnar format to store Hive data with more than 1,000 columns and improve performance. ORC format was introduced in Hive version 0.11 to use and retain the type information from the table definition.

Tip
Read ORC Files document to learn about the ORC file format.

text method

text method loads a text file.

text(path: String): DataFrame
text(paths: String*): DataFrame
Example
val lines: Dataset[String] = spark.read.text("README.md").as[String]

scala> lines.show
+--------------------+
|               value|
+--------------------+
|      # Apache Spark|
|                    |
|Spark is a fast a...|
|high-level APIs i...|
|supports general ...|
|rich set of highe...|
|MLlib for machine...|
|and Spark Streami...|
|                    |
|<http://spark.apa...|
|                    |
|                    |
|## Online Documen...|
|                    |
|You can find the ...|
|guide, on the [pr...|
|and [project wiki...|
|This README file ...|
|                    |
|   ## Building Spark|
+--------------------+
only showing top 20 rows

Loading Table to DataFrame — table Method

table(tableName: String): DataFrame

table loads the content of the tableName table into an untyped DataFrame.

scala> spark.catalog.tableExists("t1")
res1: Boolean = true

// t1 exists in the catalog
// let's load it
val t1 = spark.read.table("t1")
Note
table simply passes the call to SparkSession.table after making sure that a user-defined schema has not been specified.

Loading Data From External Table using JDBC Data Source — jdbc Method

jdbc(url: String, table: String, properties: Properties): DataFrame
jdbc(
  url: String,
  table: String,
  predicates: Array[String],
  connectionProperties: Properties): DataFrame
jdbc(
  url: String,
  table: String,
  columnName: String,
  lowerBound: Long,
  upperBound: Long,
  numPartitions: Int,
  connectionProperties: Properties): DataFrame

jdbc loads data from an external table using the JDBC data source.

Internally, jdbc creates a JDBCOptions from the input url, table and extraOptions with connectionProperties.

jdbc then creates one JDBCPartition per predicates.

In the end, jdbc requests the SparkSession to create a DataFrame for a JDBCRelation (with JDBCPartitions and JDBCOptions created earlier).

Note

jdbc does not support a custom schema and throws an AnalysisException if defined:

User specified schema not supported with `[jdbc]`
Note
jdbc method uses java.util.Properties (and appears overly Java-centric). Use format("jdbc") instead.

Loading Datasets From Text Files — textFile Method

textFile(path: String): Dataset[String]
textFile(paths: String*): Dataset[String]

textFile loads one or many text files into a typed Dataset[String].

import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...

import org.apache.spark.sql.Dataset
val lines: Dataset[String] = spark
  .read
  .textFile("README.md")
Note
textFile are similar to text family of methods in that they both read text files but text methods return untyped DataFrame while textFile return typed Dataset[String].

Internally, textFile passes calls on to text method and selects the only value column before it applies Encoders.STRING encoder.

Creating DataFrameReader Instance

DataFrameReader takes the following when created:

Loading Dataset (Data Source API V1) — loadV1Source Internal Method

loadV1Source(paths: String*): DataFrame

loadV1Source creates a DataSource and requests it to resolve the underlying relation (as a BaseRelation).

In the end, loadV1Source requests SparkSession to create a DataFrame from the BaseRelation.

Note
loadV1Source is used when DataFrameReader is requested to load (and the data source is neither of DataSourceV2 type nor a DataSourceReader could not be created).

Loading Dataset from Data Source — load Method

load(): DataFrame
load(path: String): DataFrame
load(paths: String*): DataFrame

load loads a dataset from a data source (with optional support for multiple paths) as an untyped DataFrame.

Internally, load lookupDataSource for the source. load then branches off per its type (i.e. whether it is of DataSourceV2 marker type or not).

For a "Data Source V2" data source, load…​FIXME

Otherwise, if the source is not a "Data Source V2" data source, load simply loadV1Source.

load throws a AnalysisException when the source format is hive.

Hive data source can only be used with tables, you can not read files of Hive data source directly.

assertNoSpecifiedSchema Internal Method

assertNoSpecifiedSchema(operation: String): Unit

assertNoSpecifiedSchema throws a AnalysisException if the userSpecifiedSchema is defined.

User specified schema not supported with `[operation]`
Note
assertNoSpecifiedSchema is used when DataFrameReader is requested to load data using jdbc, table and textFile.

verifyColumnNameOfCorruptRecord Internal Method

verifyColumnNameOfCorruptRecord(
  schema: StructType,
  columnNameOfCorruptRecord: String): Unit

verifyColumnNameOfCorruptRecord…​FIXME

Note
verifyColumnNameOfCorruptRecord is used when DataFrameReader is requested to load data using json and csv.

results matching ""

    No results matching ""