StreamingRelationStrategy Execution Planning Strategy for StreamingRelation and StreamingExecutionRelation Logical Operators

StreamingRelationStrategy is an streaming execution planning strategy (i.e. Strategy) that converts StreamingRelation and StreamingExecutionRelation logical operators (in the logical query plan of a streaming Dataset) to StreamingRelationExec physical operator.

StreamingRelationStrategy apply.png
Figure 1. StreamingRelationStrategy, StreamingRelation, StreamingExecutionRelation and StreamingRelationExec Operators
Note

StreamingRelation logical operator represents a streaming source in a logical plan and is created when DataStreamReader loads data from a streaming source (that gives a streaming Dataset).

StreamingExecutionRelation logical operator also represents a streaming source in a logical plan, but is used internally when StreamExecution (of a streaming Dataset) initializes the logical query plan.

StreamingRelationStrategy is used exclusively when IncrementalExecution plans the logical plan of a streaming Dataset for explain operator.

StreamingRelationStrategy converts StreamingRelation and StreamingExecutionRelation logical operators in a logical query plan to a StreamingRelationExec physical operator (with their sourceName and output schema) to give a corresponding physical query plan.

StreamingRelationStrategy is available using SessionState (of a SparkSession).

spark.sessionState.planner.StreamingRelationStrategy
val rates = spark.
  readStream.
  format("rate").
  load // <-- gives a streaming Dataset with a logical plan with StreamingRelation logical operator

// StreamingRelation logical operator for the rate streaming source
scala> println(rates.queryExecution.logical.numberedTreeString)
00 StreamingRelation DataSource(org.apache.spark.sql.SparkSession@31ba0af0,rate,List(),None,List(),None,Map(),None), rate, [timestamp#0, value#1L]

// StreamingRelationExec physical operator (shown without "Exec" suffix)
scala> rates.explain
== Physical Plan ==
StreamingRelation rate, [timestamp#0, value#1L]

// Let's do the planning manually
import spark.sessionState.planner.StreamingRelationStrategy
val physicalPlan = StreamingRelationStrategy.apply(rates.queryExecution.logical).head
scala> println(physicalPlan.numberedTreeString)
00 StreamingRelation rate, [timestamp#0, value#1L]

results matching ""

    No results matching ""