// All the following queries are equivalent
// schema has to be specified manually
import org.apache.spark.sql.types.StructType
val schema = StructType($"id".int :: Nil)
spark.read.schema(schema).format("parquet").load("parquet-datasets")
// The above is equivalent to the following shortcut
// Implicitly does format("parquet").load
spark.read.schema(schema).parquet("parquet-datasets")
// parquet is the default data source format
spark.read.schema(schema).load("parquet-datasets")
ParquetFileFormat
ParquetFileFormat
is the FileFormat for parquet data source (i.e. registers itself to handle files in parquet format and converts them to Spark SQL rows).
Note
|
parquet is the default data source format in Spark SQL.
|
Note
|
Apache Parquet is a columnar storage format for the Apache Hadoop ecosystem with support for efficient storage and encoding of data. |
ParquetFileFormat
is splitable, i.e. FIXME
ParquetFileFormat
supports vectorized parquet decoding in whole-stage code generation when all of the following hold:
-
spark.sql.parquet.enableVectorizedReader configuration property is enabled
-
spark.sql.codegen.wholeStage internal configuration property is enabled
-
The number of fields in the schema is at most spark.sql.codegen.maxFields internal configuration property
-
All the fields in the output schema are of AtomicType
ParquetFileFormat
supports filter predicate push-down optimization (via createFilter) as per the following table.
Data Source Filter | Parquet FilterPredicate |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Tip
|
Enable Add the following line to
Refer to Logging. |
Preparing Write Job — prepareWrite
Method
prepareWrite(
sparkSession: SparkSession,
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory
Note
|
prepareWrite is part of the FileFormat Contract to prepare a write job.
|
prepareWrite
…FIXME
inferSchema
Method
inferSchema(
sparkSession: SparkSession,
parameters: Map[String, String],
files: Seq[FileStatus]): Option[StructType]
Note
|
inferSchema is part of FileFormat Contract to…FIXME.
|
inferSchema
…FIXME
vectorTypes
Method
vectorTypes(
requiredSchema: StructType,
partitionSchema: StructType,
sqlConf: SQLConf): Option[Seq[String]]
Note
|
vectorTypes is part of FileFormat Contract to define the concrete column vector class names for each column used in a columnar batch when enabled.
|
vectorTypes
creates a collection of the names of OffHeapColumnVector or OnHeapColumnVector when spark.sql.columnVector.offheap.enabled property is enabled or disabled, respectively.
Note
|
spark.sql.columnVector.offheap.enabled property is disabled (false ) by default.
|
The size of the collection are all the fields of the given requiredSchema
and partitionSchema
schemas.
Building Data Reader With Partition Column Values Appended — buildReaderWithPartitionValues
Method
buildReaderWithPartitionValues(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow]
Note
|
buildReaderWithPartitionValues is part of FileFormat Contract to build a data reader with the partition column values appended.
|
buildReaderWithPartitionValues
sets the configuration options in the input hadoopConf
.
Name | Value |
---|---|
|
|
|
JSON representation of |
|
JSON representation of |
|
|
|
|
|
buildReaderWithPartitionValues
requests ParquetWriteSupport
to setSchema
.
buildReaderWithPartitionValues
tries to push filters down to create a Parquet FilterPredicate
(aka pushed
).
Note
|
Filter predicate push-down optimization for parquet data sources uses spark.sql.parquet.filterPushdown configuration property (default: enabled). |
With spark.sql.parquet.filterPushdown configuration property enabled, buildReaderWithPartitionValues
takes the input Spark data source filters
and converts them to Parquet filter predicates if possible (as described in the table). Otherwise, the Parquet filter predicate is not specified.
Note
|
buildReaderWithPartitionValues creates filter predicates for the following types: BooleanType, IntegerType, LongType, FloatType, DoubleType, StringType, BinaryType.
|
buildReaderWithPartitionValues
broadcasts the input hadoopConf
Hadoop Configuration
.
In the end, buildReaderWithPartitionValues
gives a function that takes a PartitionedFile and does the following:
-
Creates a Hadoop
FileSplit
for the inputPartitionedFile
-
Creates a Parquet
ParquetInputSplit
for the HadoopFileSplit
created -
Gets the broadcast Hadoop
Configuration
-
Creates a flag that says whether to apply timezone conversions to int96 timestamps or not (aka
convertTz
) -
Creates a Hadoop
TaskAttemptContextImpl
(with the broadcast HadoopConfiguration
and a HadoopTaskAttemptID
for a map task) -
Sets the Parquet
FilterPredicate
(only when spark.sql.parquet.filterPushdown configuration property is enabled and it is by default)
The function then branches off on whether Parquet vectorized reader is enabled or not.
Note
|
Parquet vectorized reader is enabled by default. |
With Parquet vectorized reader enabled, the function does the following:
-
Creates a VectorizedParquetRecordReader and a RecordReaderIterator
-
Requests
VectorizedParquetRecordReader
to initialize (with the ParquetParquetInputSplit
and the HadoopTaskAttemptContextImpl
) -
Prints out the following DEBUG message to the logs:
Appending [partitionSchema] [partitionValues]
-
Requests
VectorizedParquetRecordReader
to initBatch -
(only with supportBatch enabled) Requests
VectorizedParquetRecordReader
to enableReturningBatches -
In the end, the function gives the RecordReaderIterator (over the
VectorizedParquetRecordReader
) as theIterator[InternalRow]
With Parquet vectorized reader disabled, the function does the following:
-
FIXME (since Parquet vectorized reader is enabled by default it’s of less interest currently)