ParquetFileFormat

ParquetFileFormat is the FileFormat for parquet data source (i.e. registers itself to handle files in parquet format and converts them to Spark SQL rows).

Note
parquet is the default data source format in Spark SQL.
Note
Apache Parquet is a columnar storage format for the Apache Hadoop ecosystem with support for efficient storage and encoding of data.
// All the following queries are equivalent
// schema has to be specified manually
import org.apache.spark.sql.types.StructType
val schema = StructType($"id".int :: Nil)

spark.read.schema(schema).format("parquet").load("parquet-datasets")

// The above is equivalent to the following shortcut
// Implicitly does format("parquet").load
spark.read.schema(schema).parquet("parquet-datasets")

// parquet is the default data source format
spark.read.schema(schema).load("parquet-datasets")

ParquetFileFormat is splitable, i.e. FIXME

ParquetFileFormat supports vectorized parquet decoding in whole-stage code generation when all of the following hold:

  1. spark.sql.parquet.enableVectorizedReader configuration property is enabled

  2. spark.sql.codegen.wholeStage internal configuration property is enabled

  3. The number of fields in the schema is at most spark.sql.codegen.maxFields internal configuration property

  4. All the fields in the output schema are of AtomicType

ParquetFileFormat supports filter predicate push-down optimization (via createFilter) as per the following table.

Table 1. Spark Data Source Filters to Parquet Filter Predicates Conversions (aka ParquetFilters.createFilter)
Data Source Filter Parquet FilterPredicate

IsNull

FilterApi.eq

IsNotNull

FilterApi.notEq

EqualTo

FilterApi.eq

Not EqualTo

FilterApi.notEq

EqualNullSafe

FilterApi.eq

Not EqualNullSafe

FilterApi.notEq

LessThan

FilterApi.lt

LessThanOrEqual

FilterApi.ltEq

GreaterThan

FilterApi.gt

GreaterThanOrEqual

FilterApi.gtEq

And

FilterApi.and

Or

FilterApi.or

No

FilterApi.not

Tip

Enable ALL logging level for org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat=ALL

Refer to Logging.

Preparing Write Job — prepareWrite Method

prepareWrite(
  sparkSession: SparkSession,
  job: Job,
  options: Map[String, String],
  dataSchema: StructType): OutputWriterFactory
Note
prepareWrite is part of the FileFormat Contract to prepare a write job.

prepareWrite…​FIXME

inferSchema Method

inferSchema(
  sparkSession: SparkSession,
  parameters: Map[String, String],
  files: Seq[FileStatus]): Option[StructType]
Note
inferSchema is part of FileFormat Contract to…​FIXME.

inferSchema…​FIXME

vectorTypes Method

vectorTypes(
  requiredSchema: StructType,
  partitionSchema: StructType,
  sqlConf: SQLConf): Option[Seq[String]]
Note
vectorTypes is part of FileFormat Contract to define the concrete column vector class names for each column used in a columnar batch when enabled.

vectorTypes creates a collection of the names of OffHeapColumnVector or OnHeapColumnVector when spark.sql.columnVector.offheap.enabled property is enabled or disabled, respectively.

Note
spark.sql.columnVector.offheap.enabled property is disabled (false) by default.

The size of the collection are all the fields of the given requiredSchema and partitionSchema schemas.

Building Data Reader With Partition Column Values Appended — buildReaderWithPartitionValues Method

buildReaderWithPartitionValues(
  sparkSession: SparkSession,
  dataSchema: StructType,
  partitionSchema: StructType,
  requiredSchema: StructType,
  filters: Seq[Filter],
  options: Map[String, String],
  hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow]
Note
buildReaderWithPartitionValues is part of FileFormat Contract to build a data reader with the partition column values appended.

buildReaderWithPartitionValues sets the configuration options in the input hadoopConf.

Table 2. Hadoop Configuration Options
Name Value

parquet.read.support.class

ParquetReadSupport

org.apache.spark.sql.parquet.row.requested_schema

JSON representation of requiredSchema

org.apache.spark.sql.parquet.row.attributes

JSON representation of requiredSchema

spark.sql.session.timeZone

spark.sql.session.timeZone

spark.sql.parquet.binaryAsString

spark.sql.parquet.binaryAsString

spark.sql.parquet.int96AsTimestamp

spark.sql.parquet.int96AsTimestamp

buildReaderWithPartitionValues requests ParquetWriteSupport to setSchema.

buildReaderWithPartitionValues tries to push filters down to create a Parquet FilterPredicate (aka pushed).

Note
Filter predicate push-down optimization for parquet data sources uses spark.sql.parquet.filterPushdown configuration property (default: enabled).

With spark.sql.parquet.filterPushdown configuration property enabled, buildReaderWithPartitionValues takes the input Spark data source filters and converts them to Parquet filter predicates if possible (as described in the table). Otherwise, the Parquet filter predicate is not specified.

Note
buildReaderWithPartitionValues creates filter predicates for the following types: BooleanType, IntegerType, LongType, FloatType, DoubleType, StringType, BinaryType.

buildReaderWithPartitionValues broadcasts the input hadoopConf Hadoop Configuration.

In the end, buildReaderWithPartitionValues gives a function that takes a PartitionedFile and does the following:

  1. Creates a Hadoop FileSplit for the input PartitionedFile

  2. Creates a Parquet ParquetInputSplit for the Hadoop FileSplit created

  3. Gets the broadcast Hadoop Configuration

  4. Creates a flag that says whether to apply timezone conversions to int96 timestamps or not (aka convertTz)

  5. Creates a Hadoop TaskAttemptContextImpl (with the broadcast Hadoop Configuration and a Hadoop TaskAttemptID for a map task)

  6. Sets the Parquet FilterPredicate (only when spark.sql.parquet.filterPushdown configuration property is enabled and it is by default)

The function then branches off on whether Parquet vectorized reader is enabled or not.

Note
Parquet vectorized reader is enabled by default.

With Parquet vectorized reader enabled, the function does the following:

  1. Creates a VectorizedParquetRecordReader and a RecordReaderIterator

  2. Requests VectorizedParquetRecordReader to initialize (with the Parquet ParquetInputSplit and the Hadoop TaskAttemptContextImpl)

  3. Prints out the following DEBUG message to the logs:

    Appending [partitionSchema] [partitionValues]
  4. Requests VectorizedParquetRecordReader to initBatch

  5. (only with supportBatch enabled) Requests VectorizedParquetRecordReader to enableReturningBatches

  6. In the end, the function gives the RecordReaderIterator (over the VectorizedParquetRecordReader) as the Iterator[InternalRow]

With Parquet vectorized reader disabled, the function does the following:

  1. FIXME (since Parquet vectorized reader is enabled by default it’s of less interest currently)

mergeSchemasInParallel Method

mergeSchemasInParallel(
  filesToTouch: Seq[FileStatus],
  sparkSession: SparkSession): Option[StructType]

mergeSchemasInParallel…​FIXME

Note
mergeSchemasInParallel is used when…​FIXME

results matching ""

    No results matching ""