StreamingJoinStrategy Execution Planning Strategy — Stream-Stream Equi-Joins

StreamingJoinStrategy is an execution planning strategy that can plan streaming queries with Join logical operators of two streaming queries to a StreamingSymmetricHashJoinExec physical operator.

StreamingJoinStrategy is used exclusively when IncrementalExecution is requested to plan a streaming query.

Demo: Using StreamingJoinStrategy

scala> :type spark
org.apache.spark.sql.SparkSession

scala> :type spark.sessionState.planner
org.apache.spark.sql.execution.SparkPlanner

// FIXME How to use the strategy in the demo
val strategy = spark.sessionState.planner.StreamingJoinStrategy

val left = spark.readStream.format("rate").load
val right = spark.readStream.format("rate").load
val q = left.join(right, "value")
scala> q.explain(true)
...
== Optimized Logical Plan ==
Project [value#52L, timestamp#51, timestamp#55]
+- Join Inner, (value#52L = value#56L)
   :- Filter isnotnull(value#52L)
   :  +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@60b0015, rate, [timestamp#51, value#52L]
   +- Filter isnotnull(value#56L)
      +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@24d92ffc, rate, [timestamp#55, value#56L]

== Physical Plan ==
*(3) Project [value#52L, timestamp#51, timestamp#55]
+- StreamingSymmetricHashJoin [value#52L], [value#56L], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = <unknown>, runId = f254d136-d903-4b1c-9fd5-861b541848ab, opId = 0, ver = 0, numPartitions = 200], 0, state cleanup [ left = null, right = null ]
   :- Exchange hashpartitioning(value#52L, 200)
   :  +- *(1) Filter isnotnull(value#52L)
   :     +- StreamingRelation rate, [timestamp#51, value#52L]
   +- ReusedExchange [timestamp#55, value#56L], Exchange hashpartitioning(value#52L, 200)

results matching ""

    No results matching ""