import org.apache.spark.sql.SparkSession
assert(spark.isInstanceOf[SparkSession])
import org.apache.spark.sql.DataFrameReader
val reader = spark.read
assert(reader.isInstanceOf[DataFrameReader])DataFrameReader — Loading Data From External Data Sources
DataFrameReader is a fluent API to describe the input data source that will be used to "load" data from an external data source (e.g. files, tables, JDBC or Dataset[String]).
DataFrameReader is created (available) exclusively using SparkSession.read.
| Method | Description | 
|---|---|
|  | |
|  | |
|  | |
|  | |
|  | |
|  | |
|  | |
|  | |
|  | |
|  | |
|  | |
|  | |
|  | 
DataFrameReader supports many file formats natively and offers the interface to define custom formats.
| Note | DataFrameReaderassumes parquet data source file format by default that you can change using spark.sql.sources.default configuration property. | 
After you have described the loading pipeline (i.e. the "Extract" part of ETL in Spark SQL), you eventually "trigger" the loading using format-agnostic load or format-specific (e.g. json, csv, jdbc) operators.
import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
import org.apache.spark.sql.DataFrame
// Using format-agnostic load operator
val csvs: DataFrame = spark
  .read
  .format("csv")
  .option("header", true)
  .option("inferSchema", true)
  .load("*.csv")
// Using format-specific load operator
val jsons: DataFrame = spark
  .read
  .json("metrics/*.json")| Note | All methods of DataFrameReadermerely describe a process of loading a data and do not trigger a Spark job (until an action is called). | 
DataFrameReader can read text files using textFile methods that return typed Datasets.
import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
import org.apache.spark.sql.Dataset
val lines: Dataset[String] = spark
  .read
  .textFile("README.md")| Note | Loading datasets using textFile methods allows for additional preprocessing before final processing of the string values as json or csv lines. | 
DataFrameReader can load datasets from Dataset[String] (with lines being complete "files") using format-specific csv and json operators.
val csvLine = "0,Warsaw,Poland"
import org.apache.spark.sql.Dataset
val cities: Dataset[String] = Seq(csvLine).toDS
scala> cities.show
+---------------+
|          value|
+---------------+
|0,Warsaw,Poland|
+---------------+
// Define schema explicitly (as below)
// or
// option("header", true) + option("inferSchema", true)
import org.apache.spark.sql.types.StructType
val schema = new StructType()
  .add($"id".long.copy(nullable = false))
  .add($"city".string)
  .add($"country".string)
scala> schema.printTreeString
root
 |-- id: long (nullable = false)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
import org.apache.spark.sql.DataFrame
val citiesDF: DataFrame = spark
  .read
  .schema(schema)
  .csv(cities)
scala> citiesDF.show
+---+------+-------+
| id|  city|country|
+---+------+-------+
|  0|Warsaw| Poland|
+---+------+-------+ Specifying Format Of Input Data Source — format method
format(source: String): DataFrameReaderYou use format to configure DataFrameReader to use appropriate source format.
Supported data formats:
| Note | Spark SQL allows for developing custom data source formats. | 
 Specifying Schema — schema method
schema(schema: StructType): DataFrameReaderschema allows for specifying the schema of a data source (that the DataFrameReader is about to read a dataset from).
import org.apache.spark.sql.types.StructType
val schema = new StructType()
  .add($"id".long.copy(nullable = false))
  .add($"city".string)
  .add($"country".string)
scala> schema.printTreeString
root
 |-- id: long (nullable = false)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
import org.apache.spark.sql.DataFrameReader
val r: DataFrameReader = spark.read.schema(schema)| Note | Some formats can infer schema from datasets (e.g. csv or json) using inferSchema option. | 
| Tip | Read up on Schema. | 
 Specifying Load Options — option and options Methods
option(key: String, value: String): DataFrameReader
option(key: String, value: Boolean): DataFrameReader
option(key: String, value: Long): DataFrameReader
option(key: String, value: Double): DataFrameReaderYou can also use options method to describe different options in a single Map.
options(options: scala.collection.Map[String, String]): DataFrameReaderLoading Datasets from Files (into DataFrames) Using Format-Specific Load Operators
DataFrameReader supports the following file formats:
 json method
json(path: String): DataFrame
json(paths: String*): DataFrame
json(jsonDataset: Dataset[String]): DataFrame
json(jsonRDD: RDD[String]): DataFrameNew in 2.0.0: prefersDecimal
 csv method
csv(path: String): DataFrame
csv(paths: String*): DataFrame
csv(csvDataset: Dataset[String]): DataFrame parquet method
parquet(path: String): DataFrame
parquet(paths: String*): DataFrameThe supported options:
- 
compression (default: snappy)
New in 2.0.0: snappy is the default Parquet codec. See [SPARK-14482][SQL] Change default Parquet codec from gzip to snappy.
- 
noneoruncompressed
- 
snappy- the default codec in Spark 2.0.0.
- 
gzip- the default codec in Spark before 2.0.0
- 
lzo
val tokens = Seq("hello", "henry", "and", "harry")
  .zipWithIndex
  .map(_.swap)
  .toDF("id", "token")
val parquetWriter = tokens.write
parquetWriter.option("compression", "none").save("hello-none")
// The exception is mostly for my learning purposes
// so I know where and how to find the trace to the compressions
// Sorry...
scala> parquetWriter.option("compression", "unsupported").save("hello-unsupported")
java.lang.IllegalArgumentException: Codec [unsupported] is not available. Available codecs are uncompressed, gzip, lzo, snappy, none.
  at org.apache.spark.sql.execution.datasources.parquet.ParquetOptions.<init>(ParquetOptions.scala:43)
  at org.apache.spark.sql.execution.datasources.parquet.DefaultSource.prepareWrite(ParquetRelation.scala:77)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$4.apply(InsertIntoHadoopFsRelation.scala:122)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$4.apply(InsertIntoHadoopFsRelation.scala:122)
  at org.apache.spark.sql.execution.datasources.BaseWriterContainer.driverSideSetup(WriterContainer.scala:103)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:141)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:116)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:116)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:116)
  at org.apache.spark.sql.execution.command.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:61)
  at org.apache.spark.sql.execution.command.ExecutedCommand.sideEffectResult(commands.scala:59)
  at org.apache.spark.sql.execution.command.ExecutedCommand.doExecute(commands.scala:73)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:137)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:134)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:117)
  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:65)
  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:65)
  at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:390)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:230)
  ... 48 elided orc method
orc(path: String): DataFrame
orc(paths: String*): DataFrameOptimized Row Columnar (ORC) file format is a highly efficient columnar format to store Hive data with more than 1,000 columns and improve performance. ORC format was introduced in Hive version 0.11 to use and retain the type information from the table definition.
| Tip | Read ORC Files document to learn about the ORC file format. | 
 text method
text method loads a text file.
text(path: String): DataFrame
text(paths: String*): DataFrameExample
val lines: Dataset[String] = spark.read.text("README.md").as[String]
scala> lines.show
+--------------------+
|               value|
+--------------------+
|      # Apache Spark|
|                    |
|Spark is a fast a...|
|high-level APIs i...|
|supports general ...|
|rich set of highe...|
|MLlib for machine...|
|and Spark Streami...|
|                    |
|<http://spark.apa...|
|                    |
|                    |
|## Online Documen...|
|                    |
|You can find the ...|
|guide, on the [pr...|
|and [project wiki...|
|This README file ...|
|                    |
|   ## Building Spark|
+--------------------+
only showing top 20 rows Loading Table to DataFrame — table Method
table(tableName: String): DataFrametable loads the content of the tableName table into an untyped DataFrame.
scala> spark.catalog.tableExists("t1")
res1: Boolean = true
// t1 exists in the catalog
// let's load it
val t1 = spark.read.table("t1")| Note | tablesimply passes the call to SparkSession.table after making sure that a user-defined schema has not been specified. | 
 Loading Data From External Table using JDBC Data Source — jdbc Method
jdbc(url: String, table: String, properties: Properties): DataFrame
jdbc(
  url: String,
  table: String,
  predicates: Array[String],
  connectionProperties: Properties): DataFrame
jdbc(
  url: String,
  table: String,
  columnName: String,
  lowerBound: Long,
  upperBound: Long,
  numPartitions: Int,
  connectionProperties: Properties): DataFramejdbc loads data from an external table using the JDBC data source.
Internally, jdbc creates a JDBCOptions from the input url, table and extraOptions with connectionProperties.
jdbc then creates one JDBCPartition per predicates.
In the end, jdbc requests the SparkSession to create a DataFrame for a JDBCRelation (with JDBCPartitions and JDBCOptions created earlier).
| Note | 
  | 
| Note | jdbcmethod usesjava.util.Properties(and appears overly Java-centric). Use format("jdbc") instead. | 
| Tip | Review the exercise Creating DataFrames from Tables using JDBC and PostgreSQL. | 
 Loading Datasets From Text Files — textFile Method
textFile(path: String): Dataset[String]
textFile(paths: String*): Dataset[String]textFile loads one or many text files into a typed Dataset[String].
import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
import org.apache.spark.sql.Dataset
val lines: Dataset[String] = spark
  .read
  .textFile("README.md")| Note | textFileare similar to text family of methods in that they both read text files buttextmethods return untypedDataFramewhiletextFilereturn typedDataset[String]. | 
Creating DataFrameReader Instance
DataFrameReader takes the following to be created:
DataFrameReader initializes the internal properties.
 Loading Dataset (Data Source API V1) — loadV1Source Internal Method
loadV1Source(paths: String*): DataFrameloadV1Source creates a DataSource and requests it to resolve the underlying relation (as a BaseRelation).
In the end, loadV1Source requests SparkSession to create a DataFrame from the BaseRelation.
| Note | loadV1Sourceis used whenDataFrameReaderis requested to load (and the data source is neither ofDataSourceV2type nor a DataSourceReader could not be created). | 
 "Loading" Data As DataFrame — load Method
load(): DataFrame
load(path: String): DataFrame
load(paths: String*): DataFrameload loads a dataset from a data source (with optional support for multiple paths) as an untyped DataFrame.
Internally, load lookupDataSource for the source. load then branches off per its type (i.e. whether it is of DataSourceV2 marker type or not).
For a "Data Source V2" data source, load…FIXME
Otherwise, if the source is not a "Data Source V2" data source, load simply loadV1Source.
load throws a AnalysisException when the source format is hive.
Hive data source can only be used with tables, you can not read files of Hive data source directly. assertNoSpecifiedSchema Internal Method
assertNoSpecifiedSchema(operation: String): UnitassertNoSpecifiedSchema throws a AnalysisException if the userSpecifiedSchema is defined.
User specified schema not supported with `[operation]` verifyColumnNameOfCorruptRecord Internal Method
verifyColumnNameOfCorruptRecord(
  schema: StructType,
  columnNameOfCorruptRecord: String): UnitverifyColumnNameOfCorruptRecord…FIXME
 Input Data Source — source Internal Property
source: Stringsource is the name of the input data source (aka format or provider) that will be used to "load" data (as a DataFrame).
In other words, the DataFrameReader fluent API is simply to describe the input data source.
The default data source is parquet per spark.sql.sources.default configuration property.
source is usually specified using format method.
| Important | 
  | 
Once defined explicitly (using format method) or implicitly (spark.sql.sources.default configuration property), source is resolved using DataSource utility.
| Note | sourceis used exclusively whenDataFrameReaderis requested to "load" data (as a DataFrame) (explicitly or using loadV1Source). | 
Internal Properties
| Name | Description | 
|---|---|
| 
 | Used when…FIXME | 
| 
 |  Optional used-specified schema (default:  Set when  Used when  |