output: Seq[Attribute]
StreamingSymmetricHashJoinExec Binary Physical Operator — Stream-Stream Joins
StreamingSymmetricHashJoinExec
is a binary physical operator that represents a stream-stream equi-join at execution time.
Note
|
A binary physical operator ( Read up on BinaryExecNode (and physical operators in general) in The Internals of Spark SQL online book. |
StreamingSymmetricHashJoinExec
supports Inner
, LeftOuter
, and RightOuter
join types (with the left and the right keys using the exact same data types).
StreamingSymmetricHashJoinExec
is created exclusively when StreamingJoinStrategy execution planning strategy is requested to plan a logical query plan with a Join
logical operator of two streaming queries with equality predicates (EqualTo
and EqualNullSafe
).
StreamingSymmetricHashJoinExec
is given execution-specific configuration (i.e. StatefulOperatorStateInfo, event-time watermark, and JoinStateWatermarkPredicates) when IncrementalExecution
is requested to plan a streaming query for execution (and uses the state preparation rule).
StreamingSymmetricHashJoinExec
uses two OneSideHashJoiners (for the left and right sides of the join) to manage join state when processing partitions of the left and right sides of a stream-stream join.
StreamingSymmetricHashJoinExec
is a stateful physical operator that writes to a state store.
Creating StreamingSymmetricHashJoinExec Instance
StreamingSymmetricHashJoinExec
takes the following to be created:
StreamingSymmetricHashJoinExec
initializes the internal properties.
Output Schema — output
Method
Note
|
output is part of the QueryPlan Contract to describe the attributes of (the schema of) the output.
|
output
schema depends on the join type:
-
For
Cross
andInner
(InnerLike
) joins, it is the output schema of the left and right operators -
For
LeftOuter
joins, it is the output schema of the left operator with the attributes of the right operator withnullability
flag enabled (true
) -
For
RightOuter
joins, it is the output schema of the right operator with the attributes of the left operator withnullability
flag enabled (true
)
output
throws an IllegalArgumentException
for other join types:
[className] should not take [joinType] as the JoinType
Output Partitioning — outputPartitioning
Method
outputPartitioning: Partitioning
Note
|
outputPartitioning is part of the SparkPlan Contract to specify how data should be partitioned across different nodes in the cluster.
|
outputPartitioning
depends on the join type:
-
For
Cross
andInner
(InnerLike
) joins, it is aPartitioningCollection
of the output partitioning of the left and right operators -
For
LeftOuter
joins, it is aPartitioningCollection
of the output partitioning of the left operator -
For
RightOuter
joins, it is aPartitioningCollection
of the output partitioning of the right operator
outputPartitioning
throws an IllegalArgumentException
for other join types:
[className] should not take [joinType] as the JoinType
Event-Time Watermark — eventTimeWatermark
Internal Property
eventTimeWatermark: Option[Long]
When created, StreamingSymmetricHashJoinExec
can be given the event-time watermark of the current streaming micro-batch.
eventTimeWatermark
is an optional property that is specified only after IncrementalExecution was requested to apply the state preparation rule to a physical query plan of a streaming query (to optimize (prepare) the physical plan of the streaming query once for ContinuousExecution and every trigger for MicroBatchExecution in their queryPlanning phases).
Note
|
|
Watermark Predicates for State Removal — stateWatermarkPredicates
Internal Property
stateWatermarkPredicates: JoinStateWatermarkPredicates
When created, StreamingSymmetricHashJoinExec
is given a JoinStateWatermarkPredicates for the left and right join sides (using the StreamingSymmetricHashJoinHelper utility).
stateWatermarkPredicates
contains the left and right predicates only when IncrementalExecution is requested to apply the state preparation rule to a physical query plan of a streaming query (to optimize (prepare) the physical plan of the streaming query once for ContinuousExecution and every trigger for MicroBatchExecution in their queryPlanning phases).
Note
|
|
Required Partition Requirements — requiredChildDistribution
Method
requiredChildDistribution: Seq[Distribution]
Note
|
Read up on SparkPlan Contract in The Internals of Spark SQL online book. |
requiredChildDistribution
returns two HashClusteredDistributions
for the left and right keys with the required number of partitions based on the StatefulOperatorStateInfo.
Note
|
Read up on EnsureRequirements Physical Query Optimization in The Internals of Spark SQL online book. |
Note
|
Read up on HashClusteredDistribution in The Internals of Spark SQL online book. |
Performance Metrics (SQLMetrics)
StreamingSymmetricHashJoinExec
uses the performance metrics as other stateful physical operators that write to a state store.
The following table shows how the performance metrics are computed (and so their exact meaning).
Name (in web UI) | Description |
---|---|
total time to update rows |
|
total time to remove rows |
|
time to commit changes |
|
number of output rows |
|
number of total state rows |
|
number of updated state rows |
Number of updated state rows of the left and right |
memory used by state |
Checking Out Whether Last Batch Execution Requires Another Non-Data Batch or Not — shouldRunAnotherBatch
Method
shouldRunAnotherBatch(
newMetadata: OffsetSeqMetadata): Boolean
Note
|
shouldRunAnotherBatch is part of the StateStoreWriter Contract to indicate whether MicroBatchExecution should run another non-data batch (based on the updated OffsetSeqMetadata with the current event-time watermark and the batch timestamp).
|
shouldRunAnotherBatch
is positive (true
) when all of the following are positive:
-
Either the left or right join state watermark predicates are defined (in the JoinStateWatermarkPredicates)
-
Event-time watermark threshold (of the
StreamingSymmetricHashJoinExec
operator) is defined and the current event-time watermark threshold of the givenOffsetSeqMetadata
is above (greater than) it, i.e. moved above
shouldRunAnotherBatch
is negative (false
) otherwise.
Executing Physical Operator (Generating RDD[InternalRow]) — doExecute
Method
doExecute(): RDD[InternalRow]
Note
|
doExecute is part of SparkPlan Contract to generate the runtime representation of a physical operator as a recipe for distributed computation over internal binary rows on Apache Spark (RDD[InternalRow] ).
|
doExecute
first requests the StreamingQueryManager
for the StateStoreCoordinatorRef to the StateStoreCoordinator
RPC endpoint (for the driver).
doExecute
then uses SymmetricHashJoinStateManager
utility to get the names of the state stores for the left and right sides of the streaming join.
In the end, doExecute
requests the left and right child physical operators to execute (generate an RDD) and then stateStoreAwareZipPartitions with processPartitions (and with the StateStoreCoordinatorRef
and the state stores).
Processing Partitions of Left and Right Sides of Stream-Stream Join — processPartitions
Internal Method
processPartitions(
leftInputIter: Iterator[InternalRow],
rightInputIter: Iterator[InternalRow]): Iterator[InternalRow]
processPartitions
records the current time (as updateStartTimeNs for the total time to update rows performance metric in onOutputCompletion).
processPartitions
creates a new predicate (postJoinFilter) based on the bothSides
of the JoinConditionSplitPredicates if defined or true
literal.
processPartitions
creates a OneSideHashJoiner for the LeftSide and all other properties for the left-hand join side (leftSideJoiner
).
processPartitions
creates a OneSideHashJoiner for the RightSide and all other properties for the right-hand join side (rightSideJoiner
).
processPartitions
requests the OneSideHashJoiner
for the left-hand join side to storeAndJoinWithOtherSide with the right-hand side one (that creates a leftOutputIter
row iterator) and the OneSideHashJoiner
for the right-hand join side to do the same with the left-hand side one (and creates a rightOutputIter
row iterator).
processPartitions
records the current time (as innerOutputCompletionTimeNs for the total time to remove rows performance metric in onOutputCompletion).
processPartitions
creates a CompletionIterator
with the left and right output iterators (with the rows of the leftOutputIter
first followed by rightOutputIter
). When no rows are left to process, the CompletionIterator
records the completion time.
processPartitions
creates a join-specific output Iterator[InternalRow]
of the output rows based on the join type (of the StreamingSymmetricHashJoinExec
):
-
For
Inner
joins,processPartitions
simply uses the output iterator of the left and right rows -
For
LeftOuter
joins,processPartitions
… -
For
RightOuter
joins,processPartitions
… -
For other joins,
processPartitions
simply throws anIllegalArgumentException
.
processPartitions
creates an UnsafeProjection
for the output (and the output of the left and right child operators) that counts all the rows of the join-specific output iterator (as the numOutputRows metric) and generate an output projection.
In the end, processPartitions
returns a CompletionIterator
with with the output iterator with the rows counted (as numOutputRows metric) and onOutputCompletion completion function.
Note
|
processPartitions is used exclusively when StreamingSymmetricHashJoinExec physical operator is requested to execute.
|
Calculating Performance Metrics (Output Completion Callback) — onOutputCompletion
Internal Method
onOutputCompletion: Unit
onOutputCompletion
calculates the total time to update rows performance metric (that is the time since the processPartitions was executed).
onOutputCompletion
adds the time for the inner join to complete (since innerOutputCompletionTimeNs time marker) to the total time to remove rows performance metric.
onOutputCompletion
records the time to remove old state (per the join state watermark predicate for the left and the right streaming queries) and adds it to the total time to remove rows performance metric.
Note
|
onOutputCompletion triggers the old state removal eagerly by iterating over the state rows to be deleted.
|
onOutputCompletion
records the time for the left and right OneSideHashJoiners
to commit any state changes that becomes the time to commit changes performance metric.
onOutputCompletion
calculates the number of updated state rows performance metric (as the number of updated state rows of the left and right streaming queries).
onOutputCompletion
calculates the number of total state rows performance metric (as the sum of the number of keys in the KeyWithIndexToValueStore of the left and right streaming queries).
onOutputCompletion
calculates the memory used by state performance metric (as the sum of the memory used by the KeyToNumValuesStore and KeyWithIndexToValueStore of the left and right streams).
In the end, onOutputCompletion
calculates the custom metrics.
Internal Properties
Name | Description |
---|---|
|
Used exclusively to create a SymmetricHashJoinStateManager |
|
Used when |
|
|
|
|
|
Used exclusively to create a SymmetricHashJoinStateManager |