output: Seq[Attribute]
StreamingSymmetricHashJoinExec Binary Physical Operator — Stream-Stream Joins
StreamingSymmetricHashJoinExec is a binary physical operator that represents a stream-stream equi-join at execution time.
|
Note
|
A binary physical operator ( Read up on BinaryExecNode (and physical operators in general) in The Internals of Spark SQL online book. |
StreamingSymmetricHashJoinExec supports Inner, LeftOuter, and RightOuter join types (with the left and the right keys using the exact same data types).
StreamingSymmetricHashJoinExec is created exclusively when StreamingJoinStrategy execution planning strategy is requested to plan a logical query plan with a Join logical operator of two streaming queries with equality predicates (EqualTo and EqualNullSafe).
StreamingSymmetricHashJoinExec is given execution-specific configuration (i.e. StatefulOperatorStateInfo, event-time watermark, and JoinStateWatermarkPredicates) when IncrementalExecution is requested to plan a streaming query for execution (and uses the state preparation rule).
StreamingSymmetricHashJoinExec uses two OneSideHashJoiners (for the left and right sides of the join) to manage join state when processing partitions of the left and right sides of a stream-stream join.
StreamingSymmetricHashJoinExec is a stateful physical operator that writes to a state store.
Creating StreamingSymmetricHashJoinExec Instance
StreamingSymmetricHashJoinExec takes the following to be created:
StreamingSymmetricHashJoinExec initializes the internal properties.
Output Schema — output Method
|
Note
|
output is part of the QueryPlan Contract to describe the attributes of (the schema of) the output.
|
output schema depends on the join type:
-
For
CrossandInner(InnerLike) joins, it is the output schema of the left and right operators -
For
LeftOuterjoins, it is the output schema of the left operator with the attributes of the right operator withnullabilityflag enabled (true) -
For
RightOuterjoins, it is the output schema of the right operator with the attributes of the left operator withnullabilityflag enabled (true)
output throws an IllegalArgumentException for other join types:
[className] should not take [joinType] as the JoinType
Output Partitioning — outputPartitioning Method
outputPartitioning: Partitioning
|
Note
|
outputPartitioning is part of the SparkPlan Contract to specify how data should be partitioned across different nodes in the cluster.
|
outputPartitioning depends on the join type:
-
For
CrossandInner(InnerLike) joins, it is aPartitioningCollectionof the output partitioning of the left and right operators -
For
LeftOuterjoins, it is aPartitioningCollectionof the output partitioning of the left operator -
For
RightOuterjoins, it is aPartitioningCollectionof the output partitioning of the right operator
outputPartitioning throws an IllegalArgumentException for other join types:
[className] should not take [joinType] as the JoinType
Event-Time Watermark — eventTimeWatermark Internal Property
eventTimeWatermark: Option[Long]
When created, StreamingSymmetricHashJoinExec can be given the event-time watermark of the current streaming micro-batch.
eventTimeWatermark is an optional property that is specified only after IncrementalExecution was requested to apply the state preparation rule to a physical query plan of a streaming query (to optimize (prepare) the physical plan of the streaming query once for ContinuousExecution and every trigger for MicroBatchExecution in their queryPlanning phases).
|
Note
|
|
Watermark Predicates for State Removal — stateWatermarkPredicates Internal Property
stateWatermarkPredicates: JoinStateWatermarkPredicates
When created, StreamingSymmetricHashJoinExec is given a JoinStateWatermarkPredicates for the left and right join sides (using the StreamingSymmetricHashJoinHelper utility).
stateWatermarkPredicates contains the left and right predicates only when IncrementalExecution is requested to apply the state preparation rule to a physical query plan of a streaming query (to optimize (prepare) the physical plan of the streaming query once for ContinuousExecution and every trigger for MicroBatchExecution in their queryPlanning phases).
|
Note
|
|
Required Partition Requirements — requiredChildDistribution Method
requiredChildDistribution: Seq[Distribution]
|
Note
|
Read up on SparkPlan Contract in The Internals of Spark SQL online book. |
requiredChildDistribution returns two HashClusteredDistributions for the left and right keys with the required number of partitions based on the StatefulOperatorStateInfo.
|
Note
|
Read up on EnsureRequirements Physical Query Optimization in The Internals of Spark SQL online book. |
|
Note
|
Read up on HashClusteredDistribution in The Internals of Spark SQL online book. |
Performance Metrics (SQLMetrics)
StreamingSymmetricHashJoinExec uses the performance metrics as other stateful physical operators that write to a state store.
The following table shows how the performance metrics are computed (and so their exact meaning).
| Name (in web UI) | Description |
|---|---|
total time to update rows |
|
total time to remove rows |
|
time to commit changes |
|
number of output rows |
|
number of total state rows |
|
number of updated state rows |
Number of updated state rows of the left and right |
memory used by state |
Checking Out Whether Last Batch Execution Requires Another Non-Data Batch or Not — shouldRunAnotherBatch Method
shouldRunAnotherBatch(
newMetadata: OffsetSeqMetadata): Boolean
|
Note
|
shouldRunAnotherBatch is part of the StateStoreWriter Contract to indicate whether MicroBatchExecution should run another non-data batch (based on the updated OffsetSeqMetadata with the current event-time watermark and the batch timestamp).
|
shouldRunAnotherBatch is positive (true) when all of the following are positive:
-
Either the left or right join state watermark predicates are defined (in the JoinStateWatermarkPredicates)
-
Event-time watermark threshold (of the
StreamingSymmetricHashJoinExecoperator) is defined and the current event-time watermark threshold of the givenOffsetSeqMetadatais above (greater than) it, i.e. moved above
shouldRunAnotherBatch is negative (false) otherwise.
Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method
doExecute(): RDD[InternalRow]
|
Note
|
doExecute is part of SparkPlan Contract to generate the runtime representation of a physical operator as a recipe for distributed computation over internal binary rows on Apache Spark (RDD[InternalRow]).
|
doExecute first requests the StreamingQueryManager for the StateStoreCoordinatorRef to the StateStoreCoordinator RPC endpoint (for the driver).
doExecute then uses SymmetricHashJoinStateManager utility to get the names of the state stores for the left and right sides of the streaming join.
In the end, doExecute requests the left and right child physical operators to execute (generate an RDD) and then stateStoreAwareZipPartitions with processPartitions (and with the StateStoreCoordinatorRef and the state stores).
Processing Partitions of Left and Right Sides of Stream-Stream Join — processPartitions Internal Method
processPartitions(
leftInputIter: Iterator[InternalRow],
rightInputIter: Iterator[InternalRow]): Iterator[InternalRow]
processPartitions records the current time (as updateStartTimeNs for the total time to update rows performance metric in onOutputCompletion).
processPartitions creates a new predicate (postJoinFilter) based on the bothSides of the JoinConditionSplitPredicates if defined or true literal.
processPartitions creates a OneSideHashJoiner for the LeftSide and all other properties for the left-hand join side (leftSideJoiner).
processPartitions creates a OneSideHashJoiner for the RightSide and all other properties for the right-hand join side (rightSideJoiner).
processPartitions requests the OneSideHashJoiner for the left-hand join side to storeAndJoinWithOtherSide with the right-hand side one (that creates a leftOutputIter row iterator) and the OneSideHashJoiner for the right-hand join side to do the same with the left-hand side one (and creates a rightOutputIter row iterator).
processPartitions records the current time (as innerOutputCompletionTimeNs for the total time to remove rows performance metric in onOutputCompletion).
processPartitions creates a CompletionIterator with the left and right output iterators (with the rows of the leftOutputIter first followed by rightOutputIter). When no rows are left to process, the CompletionIterator records the completion time.
processPartitions creates a join-specific output Iterator[InternalRow] of the output rows based on the join type (of the StreamingSymmetricHashJoinExec):
-
For
Innerjoins,processPartitionssimply uses the output iterator of the left and right rows -
For
LeftOuterjoins,processPartitions… -
For
RightOuterjoins,processPartitions… -
For other joins,
processPartitionssimply throws anIllegalArgumentException.
processPartitions creates an UnsafeProjection for the output (and the output of the left and right child operators) that counts all the rows of the join-specific output iterator (as the numOutputRows metric) and generate an output projection.
In the end, processPartitions returns a CompletionIterator with with the output iterator with the rows counted (as numOutputRows metric) and onOutputCompletion completion function.
|
Note
|
processPartitions is used exclusively when StreamingSymmetricHashJoinExec physical operator is requested to execute.
|
Calculating Performance Metrics (Output Completion Callback) — onOutputCompletion Internal Method
onOutputCompletion: Unit
onOutputCompletion calculates the total time to update rows performance metric (that is the time since the processPartitions was executed).
onOutputCompletion adds the time for the inner join to complete (since innerOutputCompletionTimeNs time marker) to the total time to remove rows performance metric.
onOutputCompletion records the time to remove old state (per the join state watermark predicate for the left and the right streaming queries) and adds it to the total time to remove rows performance metric.
|
Note
|
onOutputCompletion triggers the old state removal eagerly by iterating over the state rows to be deleted.
|
onOutputCompletion records the time for the left and right OneSideHashJoiners to commit any state changes that becomes the time to commit changes performance metric.
onOutputCompletion calculates the number of updated state rows performance metric (as the number of updated state rows of the left and right streaming queries).
onOutputCompletion calculates the number of total state rows performance metric (as the sum of the number of keys in the KeyWithIndexToValueStore of the left and right streaming queries).
onOutputCompletion calculates the memory used by state performance metric (as the sum of the memory used by the KeyToNumValuesStore and KeyWithIndexToValueStore of the left and right streams).
In the end, onOutputCompletion calculates the custom metrics.
Internal Properties
| Name | Description |
|---|---|
|
Used exclusively to create a SymmetricHashJoinStateManager |
|
Used when |
|
|
|
|
|
Used exclusively to create a SymmetricHashJoinStateManager |