VectorizedParquetRecordReader

VectorizedParquetRecordReader is a concrete SpecificParquetRecordReaderBase for parquet file format for Vectorized Parquet Decoding.

VectorizedParquetRecordReader is created exclusively when ParquetFileFormat is requested for a data reader (with spark.sql.parquet.enableVectorizedReader property enabled and the read schema with AtomicType data types only).

Note
spark.sql.parquet.enableVectorizedReader configuration property is enabled (true) by default.

VectorizedParquetRecordReader takes the following to be created:

VectorizedParquetRecordReader uses the capacity attribute for the following:

VectorizedParquetRecordReader uses OFF_HEAP memory mode when spark.sql.columnVector.offheap.enabled internal configuration property is enabled (true).

Note
spark.sql.columnVector.offheap.enabled configuration property is disabled (false) by default.
Table 1. VectorizedParquetRecordReader’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

batchIdx

Current batch index that is the index of an InternalRow in the ColumnarBatch. Used when VectorizedParquetRecordReader is requested to getCurrentValue with the returnColumnarBatch flag disabled

Starts at 0

Increments every nextKeyValue

Reset to 0 when reading next rows into a columnar batch

columnarBatch

ColumnarBatch

columnReaders

VectorizedColumnReaders (one reader per column) to read rows as batches

Intialized when checkEndOfRowGroup (when requested to read next rows into a columnar batch)

columnVectors

Allocated WritableColumnVectors

MEMORY_MODE

Memory mode of the ColumnarBatch

Used exclusively when VectorizedParquetRecordReader is requested to initBatch.

missingColumns

Bitmap of columns (per index) that are missing (or simply the ones that the reader should not read)

numBatched

returnColumnarBatch

Optimization flag to control whether VectorizedParquetRecordReader offers rows as the ColumnarBatch or one row at a time only

Default: false

Enabled (true) when VectorizedParquetRecordReader is requested to enable returning batches

Used in nextKeyValue (to read next rows into a columnar batch) and getCurrentValue (to return the internal ColumnarBatch not a single InternalRow)

rowsReturned

Number of rows read already

totalCountLoadedSoFar

totalRowCount

Total number of rows to be read

nextKeyValue Method

boolean nextKeyValue() throws IOException
Note
nextKeyValue is part of Hadoop’s RecordReader to read (key, value) pairs from a Hadoop InputSplit to present a record-oriented view.

nextKeyValue…​FIXME

Note

nextKeyValue is used when:

resultBatch Method

ColumnarBatch resultBatch()

resultBatch gives columnarBatch if available or does initBatch.

Note
resultBatch is used exclusively when VectorizedParquetRecordReader is requested to nextKeyValue.

Initializing — initialize Method

void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
Note
initialize is part of SpecificParquetRecordReaderBase Contract to…​FIXME.

initialize…​FIXME

enableReturningBatches Method

void enableReturningBatches()

enableReturningBatches simply turns returnColumnarBatch internal flag on.

Note
enableReturningBatches is used exclusively when ParquetFileFormat is requested for a data reader (for vectorized parquet decoding in whole-stage codegen).

Initializing Columnar Batch — initBatch Method

void initBatch(StructType partitionColumns, InternalRow partitionValues) (1)
// private
private void initBatch() (2)
private void initBatch(
  MemoryMode memMode,
  StructType partitionColumns,
  InternalRow partitionValues)
  1. Uses MEMORY_MODE

  2. Uses MEMORY_MODE and no partitionColumns and no partitionValues

initBatch creates the batch schema that is sparkSchema and the input partitionColumns schema.

initBatch requests OffHeapColumnVector or OnHeapColumnVector to allocate column vectors per the input memMode, i.e. OFF_HEAP or ON_HEAP memory modes, respectively. initBatch records the allocated column vectors as the internal WritableColumnVectors.

Note

spark.sql.columnVector.offheap.enabled configuration property controls OFF_HEAP or ON_HEAP memory modes, i.e. true or false, respectively.

spark.sql.columnVector.offheap.enabled is disabled by default which means that OnHeapColumnVector is used.

initBatch creates a ColumnarBatch (with the allocated WritableColumnVectors) and records it as the internal ColumnarBatch.

initBatch creates new slots in the allocated WritableColumnVectors for the input partitionColumns and sets the input partitionValues as constants.

initBatch initializes missing columns with nulls.

Note

initBatch is used when:

Reading Next Rows Into Columnar Batch — nextBatch Method

boolean nextBatch() throws IOException

nextBatch reads at least capacity rows and returns true when there are rows available. Otherwise, nextBatch returns false (to "announce" there are no rows available).

Internally, nextBatch firstly requests every WritableColumnVector (in the columnVectors internal registry) to reset itself.

nextBatch requests the ColumnarBatch to specify the number of rows (in batch) as 0 (effectively resetting the batch and making it available for reuse).

When the rowsReturned is greater than the totalRowCount, nextBatch finishes with (returns) false (to "announce" there are no rows available).

nextBatch checkEndOfRowGroup.

nextBatch calculates the number of rows left to be returned as a minimum of the capacity and the totalCountLoadedSoFar reduced by the rowsReturned.

nextBatch requests every VectorizedColumnReader to readBatch (with the number of rows left to be returned and associated WritableColumnVector).

Note
VectorizedColumnReaders use their own WritableColumnVectors for storing values read. The numbers of VectorizedColumnReaders and WritableColumnVector are equal.
Note
The number of rows in the internal ColumnarBatch matches the number of rows that VectorizedColumnReaders decoded and stored in corresponding WritableColumnVectors.

In the end, nextBatch registers the progress as follows:

nextBatch finishes with (returns) true (to "announce" there are rows available).

Note
nextBatch is used exclusively when VectorizedParquetRecordReader is requested to nextKeyValue.

checkEndOfRowGroup Internal Method

void checkEndOfRowGroup() throws IOException

checkEndOfRowGroup…​FIXME

Note
checkEndOfRowGroup is used exclusively when VectorizedParquetRecordReader is requested to read next rows into a columnar batch.

Getting Current Value (as Columnar Batch or Single InternalRow) — getCurrentValue Method

Object getCurrentValue()
Note
getCurrentValue is part of the Hadoop RecordReader Contract to break the data into key/value pairs for input to a Hadoop Mapper.

getCurrentValue returns the entire ColumnarBatch with the returnColumnarBatch flag enabled (true) or requests it for a single row instead.

Note

getCurrentValue is used when:

  • NewHadoopRDD is requested to compute a partition (compute)

  • RecordReaderIterator is requested for the next internal row

results matching ""

    No results matching ""