LogicalPlan Contract — Logical Relational Operator with Children and Expressions / Logical Query Plan

LogicalPlan is an extension of the QueryPlan contract for logical operators to build a logical query plan (i.e. a tree of logical operators).

Note
A logical query plan is a tree of nodes of logical operators that in turn can have (trees of) Catalyst expressions. In other words, there are at least two trees at every level (operator).

LogicalPlan can be resolved.

In order to get the logical plan of a structured query you should use the QueryExecution.

scala> :type q
org.apache.spark.sql.Dataset[Long]

val plan = q.queryExecution.logical
scala> :type plan
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

LogicalPlan goes through execution stages (as a QueryExecution). In order to convert a LogicalPlan to a QueryExecution you should use SessionState and request it to "execute" the plan.

scala> :type spark
org.apache.spark.sql.SparkSession

// You could use Catalyst DSL to create a logical query plan
scala> :type plan
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

val qe = spark.sessionState.executePlan(plan)
scala> :type qe
org.apache.spark.sql.execution.QueryExecution
Note

A common idiom in Spark SQL to make sure that a logical plan can be analyzed is to request a SparkSession for the SessionState that is in turn requested to execute the logical plan (which simply creates a QueryExecution).

scala> :type plan
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

val qe = sparkSession.sessionState.executePlan(plan)
qe.assertAnalyzed()
// the following gives the analyzed logical plan
// no exceptions are expected since analysis went fine
val analyzedPlan = qe.analyzed
Note

Another common idiom in Spark SQL to convert a LogicalPlan into a Dataset is to use Dataset.ofRows internal method that executes the logical plan followed by creating a Dataset with the QueryExecution and a RowEncoder.

A logical operator is considered partially resolved when its child operators are resolved (aka children resolved).

A logical operator is (fully) resolved to a specific schema when all expressions and the children are resolved.

scala> plan.resolved
res2: Boolean = true

A logical plan knows the size of objects that are results of query operators, like join, through Statistics object.

scala> val stats = plan.statistics
stats: org.apache.spark.sql.catalyst.plans.logical.Statistics = Statistics(8,false)

A logical plan knows the maximum number of records it can compute.

scala> val maxRows = plan.maxRows
maxRows: Option[Long] = None

LogicalPlan can be streaming if it contains one or more structured streaming sources.

Note
LogicalPlan is in the end transformed to a physical query plan.
Table 1. Logical Operators / Specialized Logical Plans
LogicalPlan Description

LeafNode

Logical operator with no child operators

UnaryNode

Logical plan with a single child logical operator

BinaryNode

Logical operator with two child logical operators

Command

RunnableCommand

Table 2. LogicalPlan’s Internal Registries and Counters
Name Description

statsCache

Cached plan statistics (as Statistics) of the LogicalPlan

Computed and cached in stats.

Used in stats and verboseStringWithSuffix.

Reset in invalidateStatsCache

Getting Cached or Calculating Estimated Statistics — stats Method

stats(conf: CatalystConf): Statistics

stats returns the cached plan statistics or computes a new one (and caches it as statsCache).

Note

stats is used when:

invalidateStatsCache method

Caution
FIXME

verboseStringWithSuffix method

Caution
FIXME

setAnalyzed method

Caution
FIXME

Is Logical Plan Streaming? — isStreaming method

isStreaming: Boolean

isStreaming is part of the public API of LogicalPlan and is enabled (i.e. true) when a logical plan is a streaming source.

By default, it walks over subtrees and calls itself, i.e. isStreaming, on every child node to find a streaming source.

val spark: SparkSession = ...

// Regular dataset
scala> val ints = spark.createDataset(0 to 9)
ints: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> ints.queryExecution.logical.isStreaming
res1: Boolean = false

// Streaming dataset
scala> val logs = spark.readStream.format("text").load("logs/*.out")
logs: org.apache.spark.sql.DataFrame = [value: string]

scala> logs.queryExecution.logical.isStreaming
res2: Boolean = true
Note
Streaming Datasets are part of Structured Streaming.

Refreshing Child Logical Plans — refresh Method

refresh(): Unit

refresh calls itself recursively for every child logical operator.

Note
refresh is overriden by LogicalRelation only (that refreshes the location of HadoopFsRelation relations only).
Note

refresh is used when:

resolveQuoted Method

resolveQuoted(
  name: String,
  resolver: Resolver): Option[NamedExpression]

resolveQuoted…​FIXME

Note
resolveQuoted is used when…​FIXME

Resolving Column Attributes to References in Query Plan — resolve Method

resolve(
  schema: StructType,
  resolver: Resolver): Seq[Attribute]
resolve(
  nameParts: Seq[String],
  resolver: Resolver): Option[NamedExpression]
resolve(
  nameParts: Seq[String],
  input: Seq[Attribute],
  resolver: Resolver): Option[NamedExpression]  (1)
  1. A protected method

resolve…​FIXME

Note
resolve is used when…​FIXME

results matching ""

    No results matching ""