DataSourceScanExec — Contract for Leaf Physical Operators with Java Code Generation

DataSourceScanExec is a contract for leaf physical operators that…​FIXME

DataSourceScanExec supports Java code generation (aka codegen).

Note
The prefix for variable names for DataSourceScanExec operators in a generated Java source code is scan.

The default node name prefix is an empty string (that is used at the very beginning of simple node description).

Table 1. DataSourceScanExecs
DataSourceScanExec Description

FileSourceScanExec

RowDataSourceScanExec

DataSourceScanExec Contract

package org.apache.spark.sql.execution

trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
  // only required vals and methods that have no implementation
  val metastoreTableIdentifier: Option[TableIdentifier]
  val relation: BaseRelation
  def metadata: Map[String, String]
}
Table 2. (Subset of) DataSourceScanExec Contract
Method Description

metadata

Metadata (as a collection of key-value pairs) that describes this scan.

Used for simpleString

metastoreTableIdentifier

TableIdentifier that…​FIXME

relation

BaseRelation that…​FIXME

Simple Text Node Description — simpleString Method

simpleString: String
Note
simpleString is a part of QueryPlan Contract to give the simple text description of a TreeNode in a query plan tree.

Internally, simpleString redacts values in metadata entries and builds the text description of the metadata (with keys and their values separated using :).

simpleString concatenates nodeNamePrefix with nodeName, output schema and the metadata description.

val scanExec = basicDataSourceScanExec
scala> println(scanExec.simpleString)
Scan $line143.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anon$1@57d94b26 [] PushedFilters: [], ReadSchema: struct<>

def basicDataSourceScanExec = {
  import org.apache.spark.sql.catalyst.expressions.AttributeReference
  val output = Seq.empty[AttributeReference]
  val requiredColumnsIndex = output.indices
  import org.apache.spark.sql.sources.Filter
  val filters, handledFilters = Set.empty[Filter]
  import org.apache.spark.sql.catalyst.InternalRow
  import org.apache.spark.sql.catalyst.expressions.UnsafeRow
  val row: InternalRow = new UnsafeRow(0)
  val rdd: RDD[InternalRow] = sc.parallelize(row :: Nil)

  import org.apache.spark.sql.sources.{BaseRelation, TableScan}
  val baseRelation: BaseRelation = new BaseRelation with TableScan {
    import org.apache.spark.sql.SQLContext
    val sqlContext: SQLContext = spark.sqlContext

    import org.apache.spark.sql.types.StructType
    val schema: StructType = new StructType()

    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.Row
    def buildScan(): RDD[Row] = ???
  }

  val tableIdentifier = None
  import org.apache.spark.sql.execution.RowDataSourceScanExec
  RowDataSourceScanExec(
    output, requiredColumnsIndex, filters, handledFilters, rdd, baseRelation, tableIdentifier)
}

results matching ""

    No results matching ""