val q = spark.read.text("README.md")
val sparkPlan = q.queryExecution.executedPlan
import org.apache.spark.sql.execution.FileSourceScanExec
val scan = sparkPlan.collectFirst { case exec: FileSourceScanExec => exec }.get
val inputRDD = scan.inputRDDs.head
import org.apache.spark.sql.execution.datasources.FileScanRDD
assert(inputRDD.isInstanceOf[FileScanRDD])
val rdd = scan.execute
scala> println(rdd.toDebugString)
(1) MapPartitionsRDD[1] at execute at <console>:27 []
| FileScanRDD[0] at inputRDDs at <console>:26 []
val fileScanRDD = rdd.dependencies.head.rdd
assert(fileScanRDD.isInstanceOf[FileScanRDD])
FileScanRDD — Input RDD of FileSourceScanExec Physical Operator
FileScanRDD is an RDD of internal binary rows (i.e. RDD[InternalRow]) that is the input RDD of a FileSourceScanExec physical operator (in Whole-Stage Java Code Generation).
FileScanRDD is created exclusively when FileSourceScanExec physical operator is requested to createBucketedReadRDD or createNonBucketedReadRDD (when FileSourceScanExec operator is requested for the input RDD when WholeStageCodegenExec physical operator is executed).
When created, FileScanRDD is given FilePartitions that are custom RDD partitions with PartitionedFiles (file blocks).
FileScanRDD uses the following properties when requested to compute a partition:
FileScanRDD takes the following to be created:
-
Read function that takes a PartitionedFile and gives internal rows back (
(PartitionedFile) ⇒ Iterator[InternalRow]) -
FilePartitions (file blocks)
|
Tip
|
Enable Add the following line to
Refer to Logging. |
Placement Preferences of Partition (Preferred Locations) — getPreferredLocations Method
getPreferredLocations(split: RDDPartition): Seq[String]
|
Note
|
Find out more in The Internals of Apache Spark. |
getPreferredLocations requests the given FilePartition (split) for PartitionedFiles.
For every PartitionedFile, getPreferredLocations adds the size of the file(s) to the host (location) it is available at.
In the end, getPreferredLocations gives the top 3 hosts with the most data available (file blocks).
RDD Partitions — getPartitions Method
getPartitions: Array[RDDPartition]
|
Note
|
Find out more in The Internals of Apache Spark. |
getPartitions simply returns the FilePartitions (the FileScanRDD was created with).
Computing Partition (in TaskContext) — compute Method
compute(split: RDDPartition, context: TaskContext): Iterator[InternalRow]
|
Note
|
compute is part of Spark Core’s RDD Contract to compute a partition (in a TaskContext).
|
compute creates a Scala Iterator (of Java Objects) that…FIXME
|
Note
|
The given RDDPartition is actually a FilePartition with one or more PartitionedFiles (file blocks).
|
compute then requests the input TaskContext to register a completion listener to be executed when a task completes (i.e. addTaskCompletionListener) that simply closes the iterator.
In the end, compute returns the iterator.
Getting Next Element — next Method
next(): Object
|
Note
|
next is part of the <<https://www.scala-lang.org/api/2.12.x/scala/collection/Iterator.html#next, Iterator Contract>> to produce the next element of this iterator.
|
next takes the next element of the current iterator over elements of a file block (PartitionedFile).
next increments the metrics of bytes and number of rows read (that could be the number of rows in a ColumnarBatch for vectorized reads).