Project Unary Logical Operator

Project is a unary logical operator that is created to represent the following:

  1. Dataset operators, i.e. joinWith, select (incl. selectUntyped), unionByName

  2. KeyValueGroupedDataset operators, i.e. keys, mapValues

  3. CreateViewCommand.aliasPlan

  4. SQL’s SELECT queries with named expressions

Project can also appear in a logical plan after analysis or optimization phases.

// FIXME Add examples for the following operators
// Dataset.unionByName
// KeyValueGroupedDataset.mapValues
// KeyValueGroupedDataset.keys
// CreateViewCommand.aliasPlan

// joinWith operator
case class Person(id: Long, name: String, cityId: Long)
case class City(id: Long, name: String)
val family = Seq(
  Person(0, "Agata", 0),
  Person(1, "Iweta", 0),
  Person(2, "Patryk", 2),
  Person(3, "Maksym", 0)).toDS
val cities = Seq(
  City(0, "Warsaw"),
  City(1, "Washington"),
  City(2, "Sopot")).toDS
val q = family.joinWith(cities, family("cityId") === cities("id"), "inner")
scala> println(q.queryExecution.logical.numberedTreeString)
00 Join Inner, (_1#41.cityId =
01 :- Project [named_struct(id, id#32L, name, name#33, cityId, cityId#34L) AS _1#41]
02 :  +- LocalRelation [id#32L, name#33, cityId#34L]
03 +- Project [named_struct(id, id#38L, name, name#39) AS _2#42]
04    +- LocalRelation [id#38L, name#39]

// select operator
val qs = spark.range(10).select($"id")
scala> println(qs.queryExecution.logical.numberedTreeString)
00 'Project [unresolvedalias('id, None)]
01 +- Range (0, 10, step=1, splits=Some(8))

// select[U1](c1: TypedColumn[T, U1])
scala> :type q
org.apache.spark.sql.Dataset[(Person, City)]

val left = $"_1".as[Person]
val ql =
scala> println(ql.queryExecution.logical.numberedTreeString)
00 'SerializeFromObject [assertnotnull(assertnotnull(input[0, $line14.$read$$iw$$iw$Person, true])).id AS id#87L, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, $line14.$read$$iw$$iw$Person, true])).name, true, false) AS name#88, assertnotnull(assertnotnull(input[0, $line14.$read$$iw$$iw$Person, true])).cityId AS cityId#89L]
01 +- 'MapElements <function1>, class scala.Tuple1, [StructField(_1,StructType(StructField(id,LongType,false), StructField(name,StringType,true), StructField(cityId,LongType,false)),true)], obj#86: $line14.$read$$iw$$iw$Person
02    +- 'DeserializeToObject unresolveddeserializer(newInstance(class scala.Tuple1)), obj#85: scala.Tuple1
03       +- Project [_1#44]
04          +- Join Inner, (_1#44.cityId =
05             :- Project [named_struct(id, id#32L, name, name#33, cityId, cityId#34L) AS _1#44]
06             :  +- LocalRelation [id#32L, name#33, cityId#34L]
07             +- Project [named_struct(id, id#38L, name, name#39) AS _2#45]
08                +- LocalRelation [id#38L, name#39]

// SQL
val qn = spark.sql("select * from nums")
scala> println(qn.queryExecution.logical.numberedTreeString)
00 'Project [*]
01 +- 'UnresolvedRelation `nums`

// Examples with Project that was added during analysis
// Examples with Project that was added during optimization
Nondeterministic expressions are allowed in Project logical operator and enforced by CheckAnalysis.

The output schema of a Project is…​FIXME





You can use Spark SQL’s Catalyst DSL (in org.apache.spark.sql.catalyst.dsl package object) for constructing Catalyst data structures using Scala implicit conversions if you are into Spark testing.

scala> spark.version
res0: String = 2.3.0-SNAPSHOT

import org.apache.spark.sql.catalyst.dsl.plans._  // <-- gives table and select
val plan = table("a").select(star())
scala> println(plan.numberedTreeString)
00 'Project [*]
01 +- 'UnresolvedRelation `a`

Creating Project Instance

Project takes the following when created:

Project initializes the internal registries and counters.

results matching ""

    No results matching ""