DataSource — Pluggable Data Provider Framework

DataSource is one of the main parts of Data Source API in Spark SQL (together with DataFrameReader for loading datasets, DataFrameWriter for saving datasets and StreamSourceProvider for creating streaming sources).

DataSource models a pluggable data provider framework with the extension points for Spark SQL integrators to expand the list of supported external data sources in Spark SQL.

DataSource takes a list of file system paths that hold data. The list is empty by default, but can be different per data source:

DataSource is created when:

Table 1. DataSource’s Provider (and Format) Contracts
Extension Point Description

CreatableRelationProvider

Data source that saves the result of a structured query per save mode and returns the schema

FileFormat

Used in:

  • sourceSchema for streamed reading

  • write for writing a DataFrame to a DataSource (as part of creating a table as select)

RelationProvider

Data source that supports schema inference and can be accessed using SQL’s USING clause

SchemaRelationProvider

Data source that requires a user-defined schema

StreamSourceProvider

Used in:

As a user, you interact with DataSource by DataFrameReader (when you execute spark.read or spark.readStream) or SQL’s CREATE TABLE USING.

// Batch reading
val people: DataFrame = spark.read
  .format("csv")
  .load("people.csv")

// Streamed reading
val messages: DataFrame = spark.readStream
  .format("kafka")
  .option("subscribe", "topic")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .load

DataSource uses a SparkSession, a class name, a collection of paths, optional user-specified schema, a collection of partition columns, a bucket specification, and configuration options.

Note
Data source is also called a table provider.

When requested to resolve a batch (non-streaming) FileFormat, DataSource creates a HadoopFsRelation with the optional bucketing specification.

Table 2. DataSource’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

providingClass

The Java class (java.lang.Class) that…​FIXME

Used when…​FIXME

sourceInfo

SourceInfo

Used when…​FIXME

caseInsensitiveOptions

FIXME

Used when…​FIXME

equality

FIXME

Used when…​FIXME

backwardCompatibilityMap

FIXME

Used when…​FIXME

Writing Data to Data Source per Save Mode Followed by Reading Rows Back (as BaseRelation) — writeAndRead Method

writeAndRead(mode: SaveMode, data: DataFrame): BaseRelation
Caution
FIXME
Note
writeAndRead is used exclusively when CreateDataSourceTableAsSelectCommand logical command is executed.

Writing DataFrame to Data Source Per Save Mode — write Method

write(mode: SaveMode, data: DataFrame): BaseRelation

write writes the result of executing a structured query (as DataFrame) to a data source per save mode.

Internally, write looks up the data source and branches off per providingClass.

Table 3. write’s Branches per Supported providingClass (in execution order)
providingClass Description

CreatableRelationProvider

Executes CreatableRelationProvider.createRelation

FileFormat

writeInFileFormat

others

Reports a RuntimeException

Note
write does not support the internal CalendarIntervalType in the schema of data DataFrame and throws a AnalysisException when there is one.
Note
write is used exclusively when SaveIntoDataSourceCommand is executed.

writeInFileFormat Internal Method

Caution
FIXME

For FileFormat data sources, write takes all paths and path option and makes sure that there is only one.

Note
write uses Hadoop’s Path to access the FileSystem and calculate the qualified output path.

write requests PartitioningUtils to validatePartitionColumn.

When appending to a table, …​FIXME

In the end, write (for a FileFormat data source) prepares a InsertIntoHadoopFsRelationCommand logical plan with executes it.

Caution
FIXME Is toRdd a job execution?

createSource Method

createSource(metadataPath: String): Source
Caution
FIXME

createSink Method

Caution
FIXME

sourceSchema Internal Method

sourceSchema(): SourceInfo

sourceSchema returns the name and schema of the data source for streamed reading.

Caution
FIXME Why is the method called? Why does this bother with streamed reading and data sources?!

It supports two class hierarchies, i.e. FileFormat and Structured Streaming’s StreamSourceProvider data sources.

Internally, sourceSchema first creates an instance of the data source and…​

Caution
FIXME Finish…​

For Structured Streaming’s StreamSourceProvider data sources, sourceSchema relays calls to StreamSourceProvider.sourceSchema.

For FileFormat data sources, sourceSchema makes sure that path option was specified.

Tip
path is looked up in a case-insensitive way so paTh and PATH and pAtH are all acceptable. Use the lower-case version of path, though.
Note
path can use glob pattern (not regex syntax), i.e. contain any of {}[]*?\ characters.

It checks whether the path exists if a glob pattern is not used. In case it did not exist you will see the following AnalysisException exception in the logs:

scala> spark.read.load("the.file.does.not.exist.parquet")
org.apache.spark.sql.AnalysisException: Path does not exist: file:/Users/jacek/dev/oss/spark/the.file.does.not.exist.parquet;
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:375)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:364)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
  at scala.collection.immutable.List.flatMap(List.scala:344)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:364)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:132)
  ... 48 elided

If spark.sql.streaming.schemaInference is disabled and the data source is different than TextFileFormat, and the input userSpecifiedSchema is not specified, the following IllegalArgumentException exception is thrown:

Schema must be specified when creating a streaming source DataFrame. If some files already exist in the directory, then depending on the file format you may be able to create a static DataFrame on that directory with 'spark.read.load(directory)' and infer schema from it.
Caution
FIXME I don’t think the exception will ever happen for non-streaming sources since the schema is going to be defined earlier. When?

Eventually, it returns a SourceInfo with FileSource[path] and the schema (as calculated using the inferFileFormatSchema internal method).

For any other data source, it throws UnsupportedOperationException exception:

Data source [className] does not support streamed reading
Note
sourceSchema is used exclusively when DataSource is requested for the sourceInfo.

inferFileFormatSchema Internal Method

inferFileFormatSchema(format: FileFormat): StructType

inferFileFormatSchema private method computes (aka infers) schema (as StructType). It returns userSpecifiedSchema if specified or uses FileFormat.inferSchema. It throws a AnalysisException when is unable to infer schema.

It uses path option for the list of directory paths.

Note
It is used by DataSource.sourceSchema and DataSource.createSource when FileFormat is processed.

Resolving Relation (Creating BaseRelation) — resolveRelation Method

resolveRelation(checkFilesExist: Boolean = true): BaseRelation

resolveRelation resolves (i.e. creates) a BaseRelation.

Internally, resolveRelation tries to create an instance of the providingClass and branches off per its type and whether the optional user-specified schema was specified or not.

Table 4. Resolving BaseRelation per Provider and User-Specified Schema
Provider Behaviour

SchemaRelationProvider

Executes SchemaRelationProvider.createRelation with the provided schema

RelationProvider

Executes RelationProvider.createRelation

FileFormat

Creates a HadoopFsRelation

Note

resolveRelation is used when:

buildStorageFormatFromOptions Method

buildStorageFormatFromOptions(options: Map[String, String]): CatalogStorageFormat

buildStorageFormatFromOptions…​FIXME

Note
buildStorageFormatFromOptions is used when…​FIXME

Creating DataSource Instance

DataSource takes the following when created:

  • SparkSession

  • Name of the provider class (aka input data source format)

  • A list of file system paths that hold data (default: empty)

  • (optional) User-specified schema (default: None, i.e. undefined)

  • (optional) Names of the partition columns (default: empty)

  • Optional bucketing specification (default: None)

  • Options (default: empty)

  • (optional) CatalogTable (default: None)

DataSource initializes the internal registries and counters.

Looking Up Class By Name Of Data Source Provider — lookupDataSource Method

lookupDataSource(provider: String, conf: SQLConf): Class[_]

lookupDataSource looks up the class name in the backwardCompatibilityMap and then replaces the class name exclusively for the orc provider per spark.sql.orc.impl internal configuration property:

  • For hive (default), lookupDataSource uses org.apache.spark.sql.hive.orc.OrcFileFormat

  • For native, lookupDataSource uses the canonical class name of OrcFileFormat, i.e. org.apache.spark.sql.execution.datasources.orc.OrcFileFormat

With the provider’s class name (aka provider1 internally) lookupDataSource assumes another name variant of format [provider1].DefaultSource (aka provider2 internally).

lookupDataSource then uses Java’s ServiceLoader to find all DataSourceRegister provider classes on the CLASSPATH.

lookupDataSource filters out the DataSourceRegister provider classes (by their alias) that match the provider1 (case-insensitive), e.g. parquet or kafka.

If a single provider class was found for the alias, lookupDataSource simply returns the provider class.

If no DataSourceRegister could be found by the short name (alias), lookupDataSource considers the names of the format provider as the fully-qualified class names and tries to load them instead (using Java’s ClassLoader.loadClass).

Note
You can reference your own custom DataSource in your code by DataFrameWriter.format method which is the alias or a fully-qualified class name.
Caution
FIXME Describe the other cases (orc and avro)

If no provider class could be found, lookupDataSource throws a RuntimeException:

java.lang.ClassNotFoundException: Failed to find data source: [provider1]. Please find packages at http://spark.apache.org/third-party-projects.html

If however, lookupDataSource found multiple registered aliases for the provider name…​FIXME

Creating Logical Command for Writing (for CreatableRelationProvider and FileFormat Data Sources) — planForWriting Method

planForWriting(mode: SaveMode, data: LogicalPlan): LogicalPlan

planForWriting creates an instance of the providingClass and branches off per its type as follows:

Note

planForWriting is used when:

Planning for Writing (Using FileFormat) — planForWritingFileFormat Internal Method

planForWritingFileFormat(
  format: FileFormat,
  mode: SaveMode,
  data: LogicalPlan): InsertIntoHadoopFsRelationCommand

planForWritingFileFormat takes the paths and the path option (from the caseInsensitiveOptions) together and (assuming that there is only one path available among the paths combined) creates a fully-qualified HDFS-compatible output path for writing.

Note
planForWritingFileFormat uses Hadoop HDFS’s Path to requests for the FileSystem that owns it (using Hadoop Configuration).

planForWritingFileFormat uses the PartitioningUtils helper object to validate partition columns in the partitionColumns.

In the end, planForWritingFileFormat returns a new InsertIntoHadoopFsRelationCommand.

When the number of the paths is different than 1, planForWritingFileFormat throws an IllegalArgumentException:

Expected exactly one path to be specified, but got: [allPaths]
Note
planForWritingFileFormat is used when DataSource is requested to write data to a data source per save mode followed by reading rows back (when CreateDataSourceTableAsSelectCommand logical command is executed) and for the logical command for writing.

getOrInferFileFormatSchema Internal Method

getOrInferFileFormatSchema(
  format: FileFormat,
  fileStatusCache: FileStatusCache = NoopCache): (StructType, StructType)

getOrInferFileFormatSchema…​FIXME

Note
getOrInferFileFormatSchema is used when DataSource is requested for the sourceSchema and to resolveRelation.

results matching ""

    No results matching ""