FileSourceScanExec Leaf Physical Operator

FileSourceScanExec is a leaf physical operator (as a DataSourceScanExec) that represents a scan over collections of files (incl. Hive tables).

FileSourceScanExec is created exclusively for a LogicalRelation logical operator with a HadoopFsRelation when FileSourceStrategy execution planning strategy is executed.

// Create a bucketed data source table
// It is one of the most complex examples of a LogicalRelation with a HadoopFsRelation
val tableName = "bucketed_4_id"
spark
  .range(100)
  .withColumn("part", $"id" % 2)
  .write
  .partitionBy("part")
  .bucketBy(4, "id")
  .sortBy("id")
  .mode("overwrite")
  .saveAsTable(tableName)
val q = spark.table(tableName)
val sparkPlan = q.queryExecution.executedPlan

scala> println(sparkPlan.numberedTreeString)
00 *(1) FileScan parquet default.bucketed_4_id[id#7L,part#8L] Batched: true, Format: Parquet, Location: CatalogFileIndex[file:/Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id], PartitionCount: 2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 4 out of 4

import org.apache.spark.sql.execution.FileSourceScanExec
val scan = sparkPlan.collectFirst { case exec: FileSourceScanExec => exec }.get

scala> :type scan
org.apache.spark.sql.execution.FileSourceScanExec

scala> scan.metadata.toSeq.sortBy(_._1).map { case (k, v) => s"$k -> $v" }.foreach(println)
Batched -> true
Format -> Parquet
Location -> CatalogFileIndex[file:/Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id]
PartitionCount -> 2
PartitionFilters -> []
PushedFilters -> []
ReadSchema -> struct<id:bigint>
SelectedBucketsCount -> 4 out of 4

FileSourceScanExec supports bucket pruning so it only scans the bucket files required for a query.

scala> :type scan
org.apache.spark.sql.execution.FileSourceScanExec

import org.apache.spark.sql.execution.datasources.FileScanRDD
val rdd = scan.inputRDDs.head.asInstanceOf[FileScanRDD]

import org.apache.spark.sql.execution.datasources.FilePartition
val bucketFiles = for {
  FilePartition(bucketId, files) <- rdd.filePartitions
  f <- files
} yield s"Bucket $bucketId => $f"

scala> println(bucketFiles.size)
51

scala> bucketFiles.foreach(println)
Bucket 0 => path: file:///Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id/part=0/part-00004-5301d371-01c3-47d4-bb6b-76c3c94f3699_00000.c000.snappy.parquet, range: 0-423, partition values: [0]
Bucket 0 => path: file:///Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id/part=0/part-00001-5301d371-01c3-47d4-bb6b-76c3c94f3699_00000.c000.snappy.parquet, range: 0-423, partition values: [0]
...
Bucket 3 => path: file:///Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id/part=1/part-00005-5301d371-01c3-47d4-bb6b-76c3c94f3699_00003.c000.snappy.parquet, range: 0-423, partition values: [1]
Bucket 3 => path: file:///Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id/part=1/part-00000-5301d371-01c3-47d4-bb6b-76c3c94f3699_00003.c000.snappy.parquet, range: 0-431, partition values: [1]
Bucket 3 => path: file:///Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id/part=1/part-00007-5301d371-01c3-47d4-bb6b-76c3c94f3699_00003.c000.snappy.parquet, range: 0-423, partition values: [1]

FileSourceScanExec uses a HashPartitioning or the default UnknownPartitioning as the output partitioning scheme.

FileSourceScanExec is a ColumnarBatchScan and supports batch decoding only when the FileFormat (of the HadoopFsRelation) supports it.

FileSourceScanExec always gives the single inputRDD as the only RDD of internal rows (in Whole-Stage Java Code Generation).

FileSourceScanExec supports data source filters that are printed out to the console (at INFO logging level) and available as metadata (e.g. in web UI or explain).

Pushed Filters: [pushedDownFilters]
Table 1. FileSourceScanExec’s Performance Metrics
Key Name (in web UI) Description

metadataTime

metadata time (ms)

numFiles

number of files

numOutputRows

number of output rows

scanTime

scan time

As a DataSourceScanExec, FileSourceScanExec uses Scan for the prefix of the node name.

val fileScanExec: FileSourceScanExec = ... // see the example earlier
assert(fileScanExec.nodeName startsWith "Scan")
spark sql FileSourceScanExec webui query details.png
Figure 1. FileSourceScanExec in web UI (Details for Query)

FileSourceScanExec uses File for nodeNamePrefix (that is used for the simple node description in query plans).

val fileScanExec: FileSourceScanExec = ... // see the example earlier
assert(fileScanExec.nodeNamePrefix == "File")

scala> println(fileScanExec.simpleString)
FileScan csv [id#20,name#21,city#22] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/jacek/dev/oss/datasets/people.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,name:string,city:string>
Table 2. FileSourceScanExec’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

metadata

metadata: Map[String, String]

Metadata

Note
metadata is part of DataSourceScanExec Contract to..FIXME.

pushedDownFilters

Tip

Enable INFO logging level to see pushedDownFilters printed out to the console.

Pushed Filters: [pushedDownFilters]

Used when FileSourceScanExec is requested for the metadata and input RDD

Tip

Enable ALL logging level for org.apache.spark.sql.execution.FileSourceScanExec logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.FileSourceScanExec=ALL

Refer to Logging.

Creating RDD for Non-Bucketed Reads — createNonBucketedReadRDD Internal Method

createNonBucketedReadRDD(
  readFile: (PartitionedFile) => Iterator[InternalRow],
  selectedPartitions: Seq[PartitionDirectory],
  fsRelation: HadoopFsRelation): RDD[InternalRow]

createNonBucketedReadRDD calculates the maximum size of partitions (maxSplitBytes) based on the following properties:

createNonBucketedReadRDD sums up the size of all the files (with the extra spark.sql.files.openCostInBytes) for the given selectedPartitions and divides the sum by the "default parallelism" (i.e. number of CPU cores assigned to a Spark application) that gives bytesPerCore.

The maximum size of partitions is then the minimum of spark.sql.files.maxPartitionBytes and the bigger of spark.sql.files.openCostInBytes and the bytesPerCore.

createNonBucketedReadRDD prints out the following INFO message to the logs:

Planning scan with bin packing, max size: [maxSplitBytes] bytes, open cost is considered as scanning [openCostInBytes] bytes.

For every file (as Hadoop’s FileStatus) in every partition (as PartitionDirectory in the given selectedPartitions), createNonBucketedReadRDD gets the HDFS block locations to create PartitionedFiles (possibly split per the maximum size of partitions if the FileFormat of the HadoopFsRelation is splittable). The partitioned files are then sorted by number of bytes to read (aka split size) in decreasing order (from the largest to the smallest).

createNonBucketedReadRDD "compresses" multiple splits per partition if together they are smaller than the maxSplitBytes ("Next Fit Decreasing") that gives the necessary partitions (FilePartition).

In the end, createNonBucketedReadRDD creates a FileScanRDD (with the given (PartitionedFile) ⇒ Iterator[InternalRow] read function and the partitions).

Note
createNonBucketedReadRDD is used exclusively when FileSourceScanExec physical operator is requested for the input RDD (and neither the optional bucketing specification of the HadoopFsRelation is defined nor bucketing is enabled).

selectedPartitions Internal Lazy-Initialized Property

selectedPartitions: Seq[PartitionDirectory]

selectedPartitions…​FIXME

Note

selectedPartitions is used when FileSourceScanExec is requested for the following:

Creating FileSourceScanExec Instance

FileSourceScanExec takes the following when created:

FileSourceScanExec initializes the internal registries and counters.

Output Partitioning Scheme — outputPartitioning Attribute

outputPartitioning: Partitioning
Note
outputPartitioning is part of the SparkPlan Contract to specify output data partitioning.

outputPartitioning can be one of the following:

Creating FileScanRDD with Bucketing Support — createBucketedReadRDD Internal Method

createBucketedReadRDD(
  bucketSpec: BucketSpec,
  readFile: (PartitionedFile) => Iterator[InternalRow],
  selectedPartitions: Seq[PartitionDirectory],
  fsRelation: HadoopFsRelation): RDD[InternalRow]

createBucketedReadRDD prints the following INFO message to the logs:

Planning with [numBuckets] buckets

createBucketedReadRDD maps the available files of the input selectedPartitions into PartitionedFiles. For every file, createBucketedReadRDD getBlockLocations and getBlockHosts.

createBucketedReadRDD then groups the PartitionedFiles by bucket ID.

Note
Bucket ID is of the format _0000n, i.e. the bucket ID prefixed with up to four 0s.

createBucketedReadRDD prunes (filters out) the bucket files for the bucket IDs that are not listed in the bucket IDs for bucket pruning.

createBucketedReadRDD creates a FilePartition for every bucket ID and the (pruned) bucket PartitionedFiles.

In the end, createBucketedReadRDD creates a FileScanRDD (with the input readFile for the read function and the FilePartitions for every bucket ID for partitions)

Tip

Use RDD.toDebugString to see FileScanRDD in the RDD execution plan (aka RDD lineage).

// Create a bucketed table
spark.range(8).write.bucketBy(4, "id").saveAsTable("b1")

scala> sql("desc extended b1").where($"col_name" like "%Bucket%").show
+--------------+---------+-------+
|      col_name|data_type|comment|
+--------------+---------+-------+
|   Num Buckets|        4|       |
|Bucket Columns|   [`id`]|       |
+--------------+---------+-------+

val bucketedTable = spark.table("b1")

val lineage = bucketedTable.queryExecution.toRdd.toDebugString
scala> println(lineage)
(4) MapPartitionsRDD[26] at toRdd at <console>:26 []
 |  FileScanRDD[25] at toRdd at <console>:26 []
Note
createBucketedReadRDD is used exclusively when FileSourceScanExec physical operator is requested for the inputRDD (and the optional bucketing specification of the HadoopFsRelation is defined and bucketing is enabled).

supportsBatch Attribute

supportsBatch: Boolean
Note
supportsBatch is part of the ColumnarBatchScan Contract to enable vectorized decoding.

supportsBatch is enabled (i.e. true) only when the FileFormat (of the HadoopFsRelation) supports vectorized decoding.

Otherwise, supportsBatch is disabled (i.e. false).

FileSourceScanExec As ColumnarBatchScan

FileSourceScanExec is a ColumnarBatchScan and supports batch decoding only when the FileFormat (of the HadoopFsRelation) supports it.

FileSourceScanExec has needsUnsafeRowConversion flag enabled for ParquetFileFormat data sources exclusively.

FileSourceScanExec has vectorTypes…​FIXME

needsUnsafeRowConversion Flag

needsUnsafeRowConversion: Boolean
Note
needsUnsafeRowConversion is part of ColumnarBatchScan Contract to control the name of the variable for an input row while generating the Java source code to consume generated columns or row from a physical operator.

needsUnsafeRowConversion is enabled (i.e. true) when the following conditions all hold:

  1. FileFormat of the HadoopFsRelation is ParquetFileFormat

  2. spark.sql.parquet.enableVectorizedReader configuration property is enabled (default: true)

Otherwise, needsUnsafeRowConversion is disabled (i.e. false).

Note
needsUnsafeRowConversion is used when FileSourceScanExec is executed (and supportsBatch flag is off).

Fully-Qualified Class Names (Types) of Concrete ColumnVectors — vectorTypes Method

vectorTypes: Option[Seq[String]]
Note
vectorTypes is part of ColumnarBatchScan Contract to..FIXME.

vectorTypes simply requests the FileFormat of the HadoopFsRelation for vectorTypes.

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

doExecute(): RDD[InternalRow]
Note
doExecute is part of the SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

doExecute branches off per supportsBatch flag.

If supportsBatch is on, doExecute creates a WholeStageCodegenExec (with codegenStageId as 0) and executes it right after.

If supportsBatch is off, doExecute creates an unsafeRows RDD to scan over which is different per needsUnsafeRowConversion flag.

If needsUnsafeRowConversion flag is on, doExecute takes the inputRDD and creates a new RDD by applying a function to each partition (using RDD.mapPartitionsWithIndexInternal):

  1. Creates a UnsafeProjection for the schema

  2. Initializes the UnsafeProjection

  3. Maps over the rows in a partition iterator using the UnsafeProjection projection

Otherwise, doExecute simply takes the inputRDD as the unsafeRows RDD (with no changes).

doExecute takes the numOutputRows metric and creates a new RDD by mapping every element in the unsafeRows and incrementing the numOutputRows metric.

Tip

Use RDD.toDebugString to review the RDD lineage and "reverse-engineer" the values of the supportsBatch and needsUnsafeRowConversion flags given the number of RDDs.

With supportsBatch off and needsUnsafeRowConversion on you should see two more RDDs in the RDD lineage.

Creating Input RDD of Internal Rows — inputRDD Internal Property

inputRDD: RDD[InternalRow]
Note
inputRDD is a Scala lazy value which is computed once when accessed and cached afterwards.

inputRDD is an input RDD of internal binary rows (i.e. InternalRow) that is used when FileSourceScanExec physical operator is requested for inputRDDs and execution.

When created, inputRDD requests HadoopFsRelation to get the underlying FileFormat that is in turn requested to build a data reader with partition column values appended (with the input parameters from the properties of HadoopFsRelation and pushedDownFilters).

In case HadoopFsRelation has bucketing specification defined and bucketing support is enabled, inputRDD creates a FileScanRDD with bucketing (with the bucketing specification, the reader, selectedPartitions and the HadoopFsRelation itself). Otherwise, inputRDD createNonBucketedReadRDD.

Note
createBucketedReadRDD accepts a bucketing specification while createNonBucketedReadRDD does not.

Output Data Ordering — outputOrdering Attribute

outputOrdering: Seq[SortOrder]
Note
outputOrdering is part of the SparkPlan Contract to specify output data ordering.

outputOrdering is a SortOrder expression for every sort column in Ascending order only when all the following hold:

Otherwise, outputOrdering is simply empty (Nil).

updateDriverMetrics Internal Method

updateDriverMetrics(): Unit

updateDriverMetrics updates the following performance metrics:

In the end, updateDriverMetrics requests the SQLMetrics object to posts the metric updates.

Note
updateDriverMetrics is used exclusively when FileSourceScanExec physical operator is requested for the input RDD (the very first time).

results matching ""

    No results matching ""