// Batch reading
val people: DataFrame = spark.read
.format("csv")
.load("people.csv")
// Streamed reading
val messages: DataFrame = spark.readStream
.format("kafka")
.option("subscribe", "topic")
.option("kafka.bootstrap.servers", "localhost:9092")
.load
DataSource — Pluggable Data Provider Framework
DataSource
paves the way for Pluggable Data Provider Framework (Data Source API) in Spark SQL.
Together with the provider interfaces, DataSource
allows Spark SQL integrators to use external data systems as data sources and sinks in structured queries in Spark SQL (incl. Spark Structured Streaming).
Note
|
Data source is also called a table provider. |
DataSource
requires an alias or a fully-qualified class name of the data source provider (among other optional parameters). DataSource
uses the name to load the Java class (available as providingClass internally). Eventually, DataSource
uses the Java class to resolve a relation (the BaseRelation) to represent the data source in logical plans (using LogicalRelation leaf logical operator).
DataSource
also requires a SparkSession for the configuration properties to resolve the data source provider.
DataSource
is created when:
-
HiveMetastoreCatalog
is requested to convert a HiveTableRelation to a LogicalRelation over a HadoopFsRelation -
DataFrameReader
is requested to load data from a data source (Data Source V1) -
DataFrameWriter
is requested to save to a data source (Data Source V1) -
CreateDataSourceTableCommand, CreateDataSourceTableAsSelectCommand, InsertIntoDataSourceDirCommand, CreateTempViewUsing commands are executed
-
FindDataSourceTable and ResolveSQLOnFile logical evaluation rules are executed
-
For Spark Structured Streaming’s
FileStreamSource
,DataStreamReader
andDataStreamWriter
DataSource
takes a list of file system paths that hold data. The list is empty by default, but can be different per data source:
-
The location URI of a HiveTableRelation (when
HiveMetastoreCatalog
is requested to convert a HiveTableRelation to a LogicalRelation over a HadoopFsRelation) -
The table name of a UnresolvedRelation (when ResolveSQLOnFile logical evaluation rule is executed)
-
The files in a directory when Spark Structured Streaming’s
FileStreamSource
is requested for batches
As a Spark SQL developer (user), you interact with DataSource
by DataFrameReader (when you execute spark.read or spark.readStream) or SQL’s CREATE TABLE USING
.
When requested to resolve a batch (non-streaming) FileFormat, DataSource
creates a HadoopFsRelation with the optional bucketing specification.
Creating DataSource Instance
DataSource
takes the following to be created:
-
Fully-qualified class name or an alias of the data source provider (aka data source format)
-
(optional) User-specified schema (default: undefined)
-
(optional) Bucketing specification (default: undefined)
-
(optional) CatalogTable (default: undefined)
DataSource
initializes the internal properties.
Note
|
Only the SparkSession and the fully-qualified class name of the data source provider are required to create an instance of DataSource .
|
Loading Java Class Of Data Source Provider — lookupDataSource
Utility
lookupDataSource(
provider: String,
conf: SQLConf): Class[_]
lookupDataSource
first finds the given provider
in the backwardCompatibilityMap internal registry, and falls back to the provider
name itself when not found.
Note
|
The provider argument can be either an alias (a simple name, e.g. parquet ) or a fully-qualified class name (e.g. org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat ).
|
lookupDataSource
then uses the given SQLConf to decide on the class name of the provider for ORC and Avro data sources as follows:
-
For
orc
provider and native,lookupDataSource
uses the new ORC file format OrcFileFormat (based on Apache ORC) -
For
orc
provider and hive,lookupDataSource
usesorg.apache.spark.sql.hive.orc.OrcFileFormat
-
For
com.databricks.spark.avro
and spark.sql.legacy.replaceDatabricksSparkAvro.enabled configuration enabled (default),lookupDataSource
uses the built-in (but external) Avro data source module
lookupDataSource
uses DefaultSource
as the class name (in the provider1 package) as another provider name variant, i.e. [provider1].DefaultSource
.
lookupDataSource
uses Java’s ServiceLoader service-provider loading facility to find all data source providers of type DataSourceRegister on the Spark CLASSPATH.
Note
|
DataSourceRegister is used to register a data source provider by a short name (alias). |
lookupDataSource
tries to find the DataSourceRegister
provider classes (by their alias) that match the provider1 name (case-insensitive, e.g. parquet
or kafka
).
If a single DataSourceRegister
provider class is found, lookupDataSource
simply returns the instance of the data source provider.
If no DataSourceRegister
provider class could be found by the short name (alias), lookupDataSource
tries to load the provider1 name to be a fully-qualified class name. If not successful, lookupDataSource
tries to load the provider2 name (aka DefaultSource) instead.
Note
|
DataFrameWriter.format and DataFrameReader.format methods accept the name of the data source provider to use as an alias or a fully-qualified class name. |
import org.apache.spark.sql.execution.datasources.DataSource
val source = "parquet"
val cls = DataSource.lookupDataSource(source, spark.sessionState.conf)
Caution
|
FIXME Describe error paths (case Failure(error) and case sources ).
|
Note
|
|
createSource
Method
createSource(
metadataPath: String): Source
createSource
…FIXME
Note
|
createSource is used when…FIXME
|
createSink
Method
createSink(
outputMode: OutputMode): Sink
createSink
…FIXME
Note
|
createSink is used when…FIXME
|
sourceSchema
Internal Method
sourceSchema(): SourceInfo
sourceSchema
returns the name and schema of the data source for streamed reading.
Caution
|
FIXME Why is the method called? Why does this bother with streamed reading and data sources?! |
It supports two class hierarchies, i.e. FileFormat and Structured Streaming’s StreamSourceProvider
data sources.
Internally, sourceSchema
first creates an instance of the data source and…
Caution
|
FIXME Finish… |
For Structured Streaming’s StreamSourceProvider
data sources, sourceSchema
relays calls to StreamSourceProvider.sourceSchema
.
For FileFormat data sources, sourceSchema
makes sure that path
option was specified.
Tip
|
path is looked up in a case-insensitive way so paTh and PATH and pAtH are all acceptable. Use the lower-case version of path , though.
|
Note
|
path can use glob pattern (not regex syntax), i.e. contain any of {}[]*?\ characters.
|
It checks whether the path exists if a glob pattern is not used. In case it did not exist you will see the following AnalysisException
exception in the logs:
scala> spark.read.load("the.file.does.not.exist.parquet")
org.apache.spark.sql.AnalysisException: Path does not exist: file:/Users/jacek/dev/oss/spark/the.file.does.not.exist.parquet;
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:375)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:364)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:364)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:132)
... 48 elided
If spark.sql.streaming.schemaInference is disabled and the data source is different than TextFileFormat, and the input userSpecifiedSchema
is not specified, the following IllegalArgumentException
exception is thrown:
Schema must be specified when creating a streaming source DataFrame. If some files already exist in the directory, then depending on the file format you may be able to create a static DataFrame on that directory with 'spark.read.load(directory)' and infer schema from it.
Caution
|
FIXME I don’t think the exception will ever happen for non-streaming sources since the schema is going to be defined earlier. When? |
Eventually, it returns a SourceInfo
with FileSource[path]
and the schema (as calculated using the inferFileFormatSchema internal method).
For any other data source, it throws UnsupportedOperationException
exception:
Data source [className] does not support streamed reading
Note
|
sourceSchema is used exclusively when DataSource is requested for the sourceInfo.
|
Resolving Relation (Creating BaseRelation) — resolveRelation
Method
resolveRelation(
checkFilesExist: Boolean = true): BaseRelation
resolveRelation
resolves (i.e. creates) a BaseRelation.
Internally, resolveRelation
tries to create an instance of the providingClass and branches off per its type and whether the optional user-specified schema was specified or not.
Provider | Behaviour |
---|---|
Executes SchemaRelationProvider.createRelation with the provided schema |
|
Executes RelationProvider.createRelation |
|
Creates a HadoopFsRelation |
Note
|
|
buildStorageFormatFromOptions
Utility
buildStorageFormatFromOptions(
options: Map[String, String]): CatalogStorageFormat
buildStorageFormatFromOptions
…FIXME
Note
|
buildStorageFormatFromOptions is used when…FIXME
|
Creating Logical Command for Writing (for CreatableRelationProvider and FileFormat Data Sources) — planForWriting
Method
planForWriting(
mode: SaveMode,
data: LogicalPlan): LogicalPlan
planForWriting
creates an instance of the providingClass and branches off per its type as follows:
-
For a CreatableRelationProvider,
planForWriting
creates a SaveIntoDataSourceCommand (with the inputdata
andmode
, theCreatableRelationProvider
data source and the caseInsensitiveOptions) -
For a FileFormat,
planForWriting
planForWritingFileFormat (with theFileFormat
format and the inputmode
anddata
) -
For other types,
planForWriting
simply throws aRuntimeException
:[providingClass] does not allow create table as select.
Note
|
|
Writing Data to Data Source (per Save Mode) Followed by Reading Rows Back (as BaseRelation) — writeAndRead
Method
writeAndRead(
mode: SaveMode,
data: LogicalPlan,
outputColumnNames: Seq[String],
physicalPlan: SparkPlan): BaseRelation
writeAndRead
…FIXME
Note
|
writeAndRead is also knows as Create Table As Select (CTAS) query.
|
Note
|
writeAndRead is used when CreateDataSourceTableAsSelectCommand logical command is executed.
|
Planning for Writing (to FileFormat-Based Data Source) — planForWritingFileFormat
Internal Method
planForWritingFileFormat(
format: FileFormat,
mode: SaveMode,
data: LogicalPlan): InsertIntoHadoopFsRelationCommand
planForWritingFileFormat
takes the paths and the path
option (from the caseInsensitiveOptions) together and (assuming that there is only one path available among the paths combined) creates a fully-qualified HDFS-compatible output path for writing.
Note
|
planForWritingFileFormat uses Hadoop HDFS’s Path to requests for the FileSystem that owns it (using Hadoop Configuration).
|
planForWritingFileFormat
uses the PartitioningUtils helper object to validate partition columns in the partitionColumns.
In the end, planForWritingFileFormat
returns a new InsertIntoHadoopFsRelationCommand.
When the number of the paths is different than 1
, planForWritingFileFormat
throws an IllegalArgumentException
:
Expected exactly one path to be specified, but got: [allPaths]
Note
|
|
getOrInferFileFormatSchema
Internal Method
getOrInferFileFormatSchema(
format: FileFormat,
fileIndex: Option[InMemoryFileIndex] = None): (StructType, StructType)
getOrInferFileFormatSchema
…FIXME
Note
|
getOrInferFileFormatSchema is used when DataSource is requested for the sourceSchema and to resolve a non-streaming FileFormat-based relation.
|
checkAndGlobPathIfNecessary
Internal Method
checkAndGlobPathIfNecessary(
checkEmptyGlobPath: Boolean,
checkFilesExist: Boolean): Seq[Path]
checkAndGlobPathIfNecessary
…FIXME
Note
|
checkAndGlobPathIfNecessary is used when…FIXME
|
createInMemoryFileIndex
Internal Method
createInMemoryFileIndex(
globbedPaths: Seq[Path]): InMemoryFileIndex
createInMemoryFileIndex
…FIXME
Note
|
createInMemoryFileIndex is used when DataSource is requested to getOrInferFileFormatSchema and resolve a non-streaming FileFormat-based relation.
|
Internal Properties
Name | Description |
---|---|
|
java.lang.Class that was loaded for the given data source provider Used when:
|
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
|
Names of the data sources that are no longer available but should still be accepted (resolvable) for backward-compatibility |