val q = spark.read.text("README.md")
val sparkPlan = q.queryExecution.executedPlan
import org.apache.spark.sql.execution.FileSourceScanExec
val scan = sparkPlan.collectFirst { case exec: FileSourceScanExec => exec }.get
val inputRDD = scan.inputRDDs.head
import org.apache.spark.sql.execution.datasources.FileScanRDD
assert(inputRDD.isInstanceOf[FileScanRDD])
val rdd = scan.execute
scala> println(rdd.toDebugString)
(1) MapPartitionsRDD[1] at execute at <console>:27 []
| FileScanRDD[0] at inputRDDs at <console>:26 []
val fileScanRDD = rdd.dependencies.head.rdd
assert(fileScanRDD.isInstanceOf[FileScanRDD])
FileScanRDD — Input RDD of FileSourceScanExec Physical Operator
FileScanRDD
is an RDD
of internal binary rows (i.e. RDD[InternalRow]
) that is the input RDD of a FileSourceScanExec physical operator (in Whole-Stage Java Code Generation).
FileScanRDD
is created exclusively when FileSourceScanExec
physical operator is requested to createBucketedReadRDD or createNonBucketedReadRDD (when FileSourceScanExec
operator is requested for the input RDD when WholeStageCodegenExec
physical operator is executed).
When created, FileScanRDD
is given FilePartitions that are custom RDD partitions with PartitionedFiles (file blocks).
FileScanRDD
uses the following properties when requested to compute a partition:
FileScanRDD
takes the following to be created:
-
Read function that takes a PartitionedFile and gives internal rows back (
(PartitionedFile) ⇒ Iterator[InternalRow]
) -
FilePartitions (file blocks)
Tip
|
Enable Add the following line to
Refer to Logging. |
Placement Preferences of Partition (Preferred Locations) — getPreferredLocations
Method
getPreferredLocations(split: RDDPartition): Seq[String]
Note
|
Find out more in The Internals of Apache Spark. |
getPreferredLocations
requests the given FilePartition (split
) for PartitionedFiles.
For every PartitionedFile
, getPreferredLocations
adds the size of the file(s) to the host (location) it is available at.
In the end, getPreferredLocations
gives the top 3 hosts with the most data available (file blocks).
RDD Partitions — getPartitions
Method
getPartitions: Array[RDDPartition]
Note
|
Find out more in The Internals of Apache Spark. |
getPartitions
simply returns the FilePartitions (the FileScanRDD
was created with).
Computing Partition (in TaskContext) — compute
Method
compute(split: RDDPartition, context: TaskContext): Iterator[InternalRow]
Note
|
compute is part of Spark Core’s RDD Contract to compute a partition (in a TaskContext ).
|
compute
creates a Scala Iterator (of Java Objects
) that…FIXME
Note
|
The given RDDPartition is actually a FilePartition with one or more PartitionedFiles (file blocks).
|
compute
then requests the input TaskContext
to register a completion listener to be executed when a task completes (i.e. addTaskCompletionListener
) that simply closes the iterator.
In the end, compute
returns the iterator.
Getting Next Element — next
Method
next(): Object
Note
|
next is part of the <<https://www.scala-lang.org/api/2.12.x/scala/collection/Iterator.html#next, Iterator Contract>> to produce the next element of this iterator.
|
next
takes the next element of the current iterator over elements of a file block (PartitionedFile).
next
increments the metrics of bytes and number of rows read (that could be the number of rows in a ColumnarBatch for vectorized reads).