// Batch reading
val people: DataFrame = spark.read
.format("csv")
.load("people.csv")
// Streamed reading
val messages: DataFrame = spark.readStream
.format("kafka")
.option("subscribe", "topic")
.option("kafka.bootstrap.servers", "localhost:9092")
.load
DataSource — Pluggable Data Provider Framework
DataSource paves the way for Pluggable Data Provider Framework (Data Source API) in Spark SQL.
Together with the provider interfaces, DataSource allows Spark SQL integrators to use external data systems as data sources and sinks in structured queries in Spark SQL (incl. Spark Structured Streaming).
|
Note
|
Data source is also called a table provider. |
DataSource requires an alias or a fully-qualified class name of the data source provider (among other optional parameters). DataSource uses the name to load the Java class (available as providingClass internally). Eventually, DataSource uses the Java class to resolve a relation (the BaseRelation) to represent the data source in logical plans (using LogicalRelation leaf logical operator).
DataSource also requires a SparkSession for the configuration properties to resolve the data source provider.
DataSource is created when:
-
HiveMetastoreCatalogis requested to convert a HiveTableRelation to a LogicalRelation over a HadoopFsRelation -
DataFrameReaderis requested to load data from a data source (Data Source V1) -
DataFrameWriteris requested to save to a data source (Data Source V1) -
CreateDataSourceTableCommand, CreateDataSourceTableAsSelectCommand, InsertIntoDataSourceDirCommand, CreateTempViewUsing commands are executed
-
FindDataSourceTable and ResolveSQLOnFile logical evaluation rules are executed
-
For Spark Structured Streaming’s
FileStreamSource,DataStreamReaderandDataStreamWriter
DataSource takes a list of file system paths that hold data. The list is empty by default, but can be different per data source:
-
The location URI of a HiveTableRelation (when
HiveMetastoreCatalogis requested to convert a HiveTableRelation to a LogicalRelation over a HadoopFsRelation) -
The table name of a UnresolvedRelation (when ResolveSQLOnFile logical evaluation rule is executed)
-
The files in a directory when Spark Structured Streaming’s
FileStreamSourceis requested for batches
As a Spark SQL developer (user), you interact with DataSource by DataFrameReader (when you execute spark.read or spark.readStream) or SQL’s CREATE TABLE USING.
When requested to resolve a batch (non-streaming) FileFormat, DataSource creates a HadoopFsRelation with the optional bucketing specification.
Creating DataSource Instance
DataSource takes the following to be created:
-
Fully-qualified class name or an alias of the data source provider (aka data source format)
-
(optional) User-specified schema (default: undefined)
-
(optional) Bucketing specification (default: undefined)
-
(optional) CatalogTable (default: undefined)
DataSource initializes the internal properties.
|
Note
|
Only the SparkSession and the fully-qualified class name of the data source provider are required to create an instance of DataSource.
|
Loading Java Class Of Data Source Provider — lookupDataSource Utility
lookupDataSource(
provider: String,
conf: SQLConf): Class[_]
lookupDataSource first finds the given provider in the backwardCompatibilityMap internal registry, and falls back to the provider name itself when not found.
|
Note
|
The provider argument can be either an alias (a simple name, e.g. parquet) or a fully-qualified class name (e.g. org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat).
|
lookupDataSource then uses the given SQLConf to decide on the class name of the provider for ORC and Avro data sources as follows:
-
For
orcprovider and native,lookupDataSourceuses the new ORC file format OrcFileFormat (based on Apache ORC) -
For
orcprovider and hive,lookupDataSourceusesorg.apache.spark.sql.hive.orc.OrcFileFormat -
For
com.databricks.spark.avroand spark.sql.legacy.replaceDatabricksSparkAvro.enabled configuration enabled (default),lookupDataSourceuses the built-in (but external) Avro data source module
lookupDataSource uses DefaultSource as the class name (in the provider1 package) as another provider name variant, i.e. [provider1].DefaultSource.
lookupDataSource uses Java’s ServiceLoader service-provider loading facility to find all data source providers of type DataSourceRegister on the Spark CLASSPATH.
|
Note
|
DataSourceRegister is used to register a data source provider by a short name (alias). |
lookupDataSource tries to find the DataSourceRegister provider classes (by their alias) that match the provider1 name (case-insensitive, e.g. parquet or kafka).
If a single DataSourceRegister provider class is found, lookupDataSource simply returns the instance of the data source provider.
If no DataSourceRegister provider class could be found by the short name (alias), lookupDataSource tries to load the provider1 name to be a fully-qualified class name. If not successful, lookupDataSource tries to load the provider2 name (aka DefaultSource) instead.
|
Note
|
DataFrameWriter.format and DataFrameReader.format methods accept the name of the data source provider to use as an alias or a fully-qualified class name. |
import org.apache.spark.sql.execution.datasources.DataSource
val source = "parquet"
val cls = DataSource.lookupDataSource(source, spark.sessionState.conf)
|
Caution
|
FIXME Describe error paths (case Failure(error) and case sources).
|
|
Note
|
|
createSource Method
createSource(
metadataPath: String): Source
createSource…FIXME
|
Note
|
createSource is used when…FIXME
|
createSink Method
createSink(
outputMode: OutputMode): Sink
createSink…FIXME
|
Note
|
createSink is used when…FIXME
|
sourceSchema Internal Method
sourceSchema(): SourceInfo
sourceSchema returns the name and schema of the data source for streamed reading.
|
Caution
|
FIXME Why is the method called? Why does this bother with streamed reading and data sources?! |
It supports two class hierarchies, i.e. FileFormat and Structured Streaming’s StreamSourceProvider data sources.
Internally, sourceSchema first creates an instance of the data source and…
|
Caution
|
FIXME Finish… |
For Structured Streaming’s StreamSourceProvider data sources, sourceSchema relays calls to StreamSourceProvider.sourceSchema.
For FileFormat data sources, sourceSchema makes sure that path option was specified.
|
Tip
|
path is looked up in a case-insensitive way so paTh and PATH and pAtH are all acceptable. Use the lower-case version of path, though.
|
|
Note
|
path can use glob pattern (not regex syntax), i.e. contain any of {}[]*?\ characters.
|
It checks whether the path exists if a glob pattern is not used. In case it did not exist you will see the following AnalysisException exception in the logs:
scala> spark.read.load("the.file.does.not.exist.parquet")
org.apache.spark.sql.AnalysisException: Path does not exist: file:/Users/jacek/dev/oss/spark/the.file.does.not.exist.parquet;
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:375)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:364)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:364)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:132)
... 48 elided
If spark.sql.streaming.schemaInference is disabled and the data source is different than TextFileFormat, and the input userSpecifiedSchema is not specified, the following IllegalArgumentException exception is thrown:
Schema must be specified when creating a streaming source DataFrame. If some files already exist in the directory, then depending on the file format you may be able to create a static DataFrame on that directory with 'spark.read.load(directory)' and infer schema from it.
|
Caution
|
FIXME I don’t think the exception will ever happen for non-streaming sources since the schema is going to be defined earlier. When? |
Eventually, it returns a SourceInfo with FileSource[path] and the schema (as calculated using the inferFileFormatSchema internal method).
For any other data source, it throws UnsupportedOperationException exception:
Data source [className] does not support streamed reading
|
Note
|
sourceSchema is used exclusively when DataSource is requested for the sourceInfo.
|
Resolving Relation (Creating BaseRelation) — resolveRelation Method
resolveRelation(
checkFilesExist: Boolean = true): BaseRelation
resolveRelation resolves (i.e. creates) a BaseRelation.
Internally, resolveRelation tries to create an instance of the providingClass and branches off per its type and whether the optional user-specified schema was specified or not.
| Provider | Behaviour |
|---|---|
Executes SchemaRelationProvider.createRelation with the provided schema |
|
Executes RelationProvider.createRelation |
|
Creates a HadoopFsRelation |
|
Note
|
|
buildStorageFormatFromOptions Utility
buildStorageFormatFromOptions(
options: Map[String, String]): CatalogStorageFormat
buildStorageFormatFromOptions…FIXME
|
Note
|
buildStorageFormatFromOptions is used when…FIXME
|
Creating Logical Command for Writing (for CreatableRelationProvider and FileFormat Data Sources) — planForWriting Method
planForWriting(
mode: SaveMode,
data: LogicalPlan): LogicalPlan
planForWriting creates an instance of the providingClass and branches off per its type as follows:
-
For a CreatableRelationProvider,
planForWritingcreates a SaveIntoDataSourceCommand (with the inputdataandmode, theCreatableRelationProviderdata source and the caseInsensitiveOptions) -
For a FileFormat,
planForWritingplanForWritingFileFormat (with theFileFormatformat and the inputmodeanddata) -
For other types,
planForWritingsimply throws aRuntimeException:[providingClass] does not allow create table as select.
|
Note
|
|
Writing Data to Data Source (per Save Mode) Followed by Reading Rows Back (as BaseRelation) — writeAndRead Method
writeAndRead(
mode: SaveMode,
data: LogicalPlan,
outputColumnNames: Seq[String],
physicalPlan: SparkPlan): BaseRelation
writeAndRead…FIXME
|
Note
|
writeAndRead is also knows as Create Table As Select (CTAS) query.
|
|
Note
|
writeAndRead is used when CreateDataSourceTableAsSelectCommand logical command is executed.
|
Planning for Writing (to FileFormat-Based Data Source) — planForWritingFileFormat Internal Method
planForWritingFileFormat(
format: FileFormat,
mode: SaveMode,
data: LogicalPlan): InsertIntoHadoopFsRelationCommand
planForWritingFileFormat takes the paths and the path option (from the caseInsensitiveOptions) together and (assuming that there is only one path available among the paths combined) creates a fully-qualified HDFS-compatible output path for writing.
|
Note
|
planForWritingFileFormat uses Hadoop HDFS’s Path to requests for the FileSystem that owns it (using Hadoop Configuration).
|
planForWritingFileFormat uses the PartitioningUtils helper object to validate partition columns in the partitionColumns.
In the end, planForWritingFileFormat returns a new InsertIntoHadoopFsRelationCommand.
When the number of the paths is different than 1, planForWritingFileFormat throws an IllegalArgumentException:
Expected exactly one path to be specified, but got: [allPaths]
|
Note
|
|
getOrInferFileFormatSchema Internal Method
getOrInferFileFormatSchema(
format: FileFormat,
fileIndex: Option[InMemoryFileIndex] = None): (StructType, StructType)
getOrInferFileFormatSchema…FIXME
|
Note
|
getOrInferFileFormatSchema is used when DataSource is requested for the sourceSchema and to resolve a non-streaming FileFormat-based relation.
|
checkAndGlobPathIfNecessary Internal Method
checkAndGlobPathIfNecessary(
checkEmptyGlobPath: Boolean,
checkFilesExist: Boolean): Seq[Path]
checkAndGlobPathIfNecessary…FIXME
|
Note
|
checkAndGlobPathIfNecessary is used when…FIXME
|
createInMemoryFileIndex Internal Method
createInMemoryFileIndex(
globbedPaths: Seq[Path]): InMemoryFileIndex
createInMemoryFileIndex…FIXME
|
Note
|
createInMemoryFileIndex is used when DataSource is requested to getOrInferFileFormatSchema and resolve a non-streaming FileFormat-based relation.
|
Internal Properties
| Name | Description |
|---|---|
|
java.lang.Class that was loaded for the given data source provider Used when:
|
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
|
Names of the data sources that are no longer available but should still be accepted (resolvable) for backward-compatibility |