StreamingSymmetricHashJoinExec Binary Physical Operator — Stream-Stream Joins

StreamingSymmetricHashJoinExec is a binary physical operator that represents a Join logical operator of two streaming structured queries with equality predicates at execution time.

Note

Join logical operator represents Dataset.join operator.

Find out more in Join Logical Operator section in The Internals of Spark SQL book.

StreamingSymmetricHashJoinExec is created exclusively when StreamingJoinStrategy execution planning strategy is requested to plan a logical query plan with a Join logical operator of two streaming structured queries with equality predicates (i.e. EqualTo and EqualNullSafe)

// Reduce the number of partitions
// StreamingSymmetricHashJoin physical operator requires:
// 1. The numbers of partitions on left and right are the same
// 2. Uses the numbers of partitions for its own number of partitions
// That makes debugging state stores easier to manage
import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS
spark.sessionState.conf.setConf(SHUFFLE_PARTITIONS, 1)
assert(spark.sessionState.conf.numShufflePartitions == 1)

val left = spark.readStream.format("rate").load
val right = spark.readStream.format("rate").load
val equiStreamingJoin = left.join(right, "value")
// You cannot use QueryExecution directly
// Since Structured Streaming uses its own "restrictive" QueryExecution
scala> equiStreamingJoin.explain
== Physical Plan ==
*(3) Project [value#190L, timestamp#189, timestamp#193]
+- StreamingSymmetricHashJoin [value#190L], [value#194L], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = <unknown>, runId = 2decf335-c6d4-4810-b95e-abf75181006a, opId = 0, ver = 0, numPartitions = 1], 0, state cleanup [ left = null, right = null ]
   :- Exchange hashpartitioning(value#190L, 1)
   :  +- *(1) Filter isnotnull(value#190L)
   :     +- StreamingRelation rate, [timestamp#189, value#190L]
   +- ReusedExchange [timestamp#193, value#194L], Exchange hashpartitioning(value#190L, 1)

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val q = equiStreamingJoin
  .writeStream
  .format("console")
  .option("truncate", false)
  .trigger(Trigger.ProcessingTime(30.seconds))
  .queryName("stream-stream-join")
  .start

scala> q.explain
== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@b807853
+- *(3) Project [value#364L, timestamp#363, timestamp#361]
   +- StreamingSymmetricHashJoin [value#364L], [value#362L], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = file:/private/var/folders/0w/kb0d3rqn4zb9fcc91pxhgn8w0000gn/T/temporary-376c26ce-c6ae-4572-8e04-927c7a445b46/state, runId = bdfeae3b-3732-4dfe-8d1d-22a089e60fc1, opId = 0, ver = 3, numPartitions = 1], 0, state cleanup [ left = null, right = null ]
      :- Exchange hashpartitioning(value#364L, 1)
      :  +- *(1) Filter isnotnull(value#364L)
      :     +- *(1) Project [timestamp#363, value#364L]
      :        +- *(1) ScanV2 rate[timestamp#363, value#364L]
      +- ReusedExchange [timestamp#361, value#362L], Exchange hashpartitioning(value#364L, 1)

// In the end, stop the query
q.stop

StreamingSymmetricHashJoinExec is a stateful physical operator that writes to a state store.

StreamingSymmetricHashJoinExec uses the performance metrics of StateStoreWriter.

When executed, StreamingSymmetricHashJoinExec…​FIXME

The output schema of StreamingSymmetricHashJoinExec is…​FIXME

The output partitioning of StreamingSymmetricHashJoinExec is…​FIXME

Executing Physical Operator — doExecute Method

doExecute(): RDD[InternalRow]
Note
doExecute is a part of SparkPlan contract to produce the result of a physical operator as an RDD of internal binary rows (i.e. InternalRow).

doExecute…​FIXME

Creating StreamingSymmetricHashJoinExec Instance

StreamingSymmetricHashJoinExec takes the following to be created:

  • Catalyst expressions of the keys on the left side

  • Catalyst expressions of the keys on the right side

  • JoinType

  • Join condition (JoinConditionSplitPredicates)

  • Optional StatefulOperatorStateInfo

  • Optional event-time watermark

  • State watermark (JoinStateWatermarkPredicates)

  • Physical operator on the left side (SparkPlan)

  • Physical operator on the right side SparkPlan

StreamingSymmetricHashJoinExec initializes the internal registries and counters.

processPartitions Internal Method

processPartitions(
  leftInputIter: Iterator[InternalRow],
  rightInputIter: Iterator[InternalRow]): Iterator[InternalRow]

processPartitions…​FIXME

Note
processPartitions is used exclusively when StreamingSymmetricHashJoinExec physical operator is requested to doExecute.

results matching ""

    No results matching ""