FileScanRDD — Input RDD of FileSourceScanExec Physical Operator

FileScanRDD is an RDD of internal binary rows (i.e. RDD[InternalRow]) that is the input RDD of a FileSourceScanExec physical operator (in Whole-Stage Java Code Generation).

FileScanRDD is created exclusively when FileSourceScanExec physical operator is requested to createBucketedReadRDD or createNonBucketedReadRDD (when FileSourceScanExec operator is requested for the input RDD when WholeStageCodegenExec physical operator is executed).

val q = spark.read.text("README.md")

val sparkPlan = q.queryExecution.executedPlan
import org.apache.spark.sql.execution.FileSourceScanExec
val scan = sparkPlan.collectFirst { case exec: FileSourceScanExec => exec }.get
val inputRDD = scan.inputRDDs.head

import org.apache.spark.sql.execution.datasources.FileScanRDD
assert(inputRDD.isInstanceOf[FileScanRDD])

val rdd = scan.execute
scala> println(rdd.toDebugString)
(1) MapPartitionsRDD[1] at execute at <console>:27 []
 |  FileScanRDD[0] at inputRDDs at <console>:26 []

val fileScanRDD = rdd.dependencies.head.rdd
assert(fileScanRDD.isInstanceOf[FileScanRDD])

When created, FileScanRDD is given FilePartitions that are custom RDD partitions with PartitionedFiles (file blocks).

FileScanRDD uses the following properties when requested to compute a partition:

FileScanRDD takes the following to be created:

Tip

Enable ALL logging level for org.apache.spark.sql.execution.datasources.FileScanRDD logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.datasources.FileScanRDD=ALL

Refer to Logging.

Placement Preferences of Partition (Preferred Locations) — getPreferredLocations Method

getPreferredLocations(split: RDDPartition): Seq[String]
Note

getPreferredLocations is part of the RDD Contract to specify placement preferences (aka preferred locations) of a partition.

Find out more in The Internals of Apache Spark.

getPreferredLocations requests the given FilePartition (split) for PartitionedFiles.

For every PartitionedFile, getPreferredLocations adds the size of the file(s) to the host (location) it is available at.

In the end, getPreferredLocations gives the top 3 hosts with the most data available (file blocks).

RDD Partitions — getPartitions Method

getPartitions: Array[RDDPartition]
Note

getPartitions is part of the RDD Contract to specify the partitions of a distributed computation.

Find out more in The Internals of Apache Spark.

getPartitions simply returns the FilePartitions (the FileScanRDD was created with).

Computing Partition (in TaskContext) — compute Method

compute(split: RDDPartition, context: TaskContext): Iterator[InternalRow]
Note
compute is part of Spark Core’s RDD Contract to compute a partition (in a TaskContext).

compute creates a Scala Iterator (of Java Objects) that…​FIXME

Note
The given RDDPartition is actually a FilePartition with one or more PartitionedFiles (file blocks).

compute then requests the input TaskContext to register a completion listener to be executed when a task completes (i.e. addTaskCompletionListener) that simply closes the iterator.

In the end, compute returns the iterator.

Getting Next Element — next Method

next(): Object
Note
next is part of the <<https://www.scala-lang.org/api/2.12.x/scala/collection/Iterator.html#next, Iterator Contract>> to produce the next element of this iterator.

next takes the next element of the current iterator over elements of a file block (PartitionedFile).

next increments the metrics of bytes and number of rows read (that could be the number of rows in a ColumnarBatch for vectorized reads).

Getting Next Iterator — nextIterator Internal Method

nextIterator(): Boolean

nextIterator…​FIXME

Getting Iterator (of Elements) of Current File Block — readCurrentFile Internal Method

readCurrentFile(): Iterator[InternalRow]

readCurrentFile…​FIXME

results matching ""

    No results matching ""