OneSideHashJoiner
OneSideHashJoiner manages join state of one side of a stream-stream join (using SymmetricHashJoinStateManager).
OneSideHashJoiner is created exclusively for StreamingSymmetricHashJoinExec physical operator (when requested to process partitions of the left and right sides of a stream-stream join).
StreamingSymmetricHashJoinExec physical operator uses two OneSideHashJoiners per side of the stream-stream join (left and right sides).
OneSideHashJoiner uses an optional join state watermark predicate to remove old state.
|
Note
|
OneSideHashJoiner is a Scala private internal class of StreamingSymmetricHashJoinExec and so has full access to StreamingSymmetricHashJoinExec properties.
|
Creating OneSideHashJoiner Instance
OneSideHashJoiner takes the following to be created:
OneSideHashJoiner initializes the internal registries and counters.
SymmetricHashJoinStateManager — joinStateManager Internal Property
joinStateManager: SymmetricHashJoinStateManager
joinStateManager is a SymmetricHashJoinStateManager that is created for a OneSideHashJoiner (with the join side, the input attributes, the join keys, and the StatefulOperatorStateInfo of the owning StreamingSymmetricHashJoinExec).
joinStateManager is used when OneSideHashJoiner is requested for the following:
Number of Updated State Rows — updatedStateRowsCount Internal Counter
updatedStateRowsCount is the number the join keys and associated rows that were persisted as a join state, i.e. how many times storeAndJoinWithOtherSide requested the SymmetricHashJoinStateManager to append the join key and the input row (to a join state).
updatedStateRowsCount is then used (via numUpdatedStateRows method) for the numUpdatedStateRows performance metric.
updatedStateRowsCount is available via numUpdatedStateRows method.
numUpdatedStateRows: Long
|
Note
|
numUpdatedStateRows is used exclusively when StreamingSymmetricHashJoinExec physical operator is requested to [spark-sql-streaming-StreamingSymmetricHashJoinExec#processPartitions process partitions of the left and right sides of a stream-stream join] (and completes).
|
Optional Join State Watermark Predicate — stateWatermarkPredicate Internal Property
stateWatermarkPredicate: Option[JoinStateWatermarkPredicate]
When created, OneSideHashJoiner is given a JoinStateWatermarkPredicate.
stateWatermarkPredicate is used for the stateKeyWatermarkPredicateFunc (when a JoinStateKeyWatermarkPredicate) and the stateValueWatermarkPredicateFunc (when a JoinStateValueWatermarkPredicate) that are both used when OneSideHashJoiner is requested to removeOldState.
storeAndJoinWithOtherSide Method
storeAndJoinWithOtherSide(
otherSideJoiner: OneSideHashJoiner)(
generateJoinedRow: (InternalRow, InternalRow) => JoinedRow): Iterator[InternalRow]
storeAndJoinWithOtherSide tries to find the watermark attribute among the input attributes.
storeAndJoinWithOtherSide creates a watermark expression (for the watermark attribute and the current event-time watermark).
With the watermark attribute found, storeAndJoinWithOtherSide generates a new predicate for the watermark expression and the input attributes that is then used to filter out (exclude) late rows from the input. Otherwise, the input rows are left unchanged (i.e. no rows are considered late and excluded).
For every input row (possibly watermarked), storeAndJoinWithOtherSide applies the preJoinFilter predicate and branches off per result (true or false).
|
Note
|
storeAndJoinWithOtherSide is used when StreamingSymmetricHashJoinExec physical operator is requested to process partitions of the left and right sides of a stream-stream join.
|
preJoinFilter Predicate Positive (true)
When the preJoinFilter predicate succeeds on an input row, storeAndJoinWithOtherSide extracts the join key (using the keyGenerator) and requests the given OneSideHashJoiner (otherSideJoiner) for the SymmetricHashJoinStateManager that is in turn requested for the state values for the extracted join key. The values are then processed (mapped over) using the given generateJoinedRow function and then filtered by the post-join filter.
storeAndJoinWithOtherSide uses the stateKeyWatermarkPredicateFunc (on the extracted join key) and the stateValueWatermarkPredicateFunc (on the current input row) to determine whether to request the SymmetricHashJoinStateManager to append the key and the input row (to a join state). If so, storeAndJoinWithOtherSide increments the updatedStateRowsCount counter.
preJoinFilter Predicate Negative (false)
When the preJoinFilter predicate fails on an input row, storeAndJoinWithOtherSide creates a new Iterator[InternalRow] of joined rows per join side and type:
-
For LeftSide and
LeftOuter, the join row is the current row with the values of the right side allnull(nullRight) -
For RightSide and
RightOuter, the join row is the current row with the values of the left side allnull(nullLeft) -
For all other combinations, the iterator is simply empty (that will be removed from the output by the outer nonLateRows.flatMap).
Removing Old State — removeOldState Method
removeOldState(): Iterator[UnsafeRowPair]
removeOldState branches off per the JoinStateWatermarkPredicate:
-
For JoinStateKeyWatermarkPredicate,
removeOldStaterequests the SymmetricHashJoinStateManager to removeByKeyCondition (with the stateKeyWatermarkPredicateFunc) -
For JoinStateValueWatermarkPredicate,
removeOldStaterequests the SymmetricHashJoinStateManager to removeByValueCondition (with the stateValueWatermarkPredicateFunc) -
For any other predicates,
removeOldStatereturns an empty iterator (no rows to process)
|
Note
|
removeOldState is used exclusively when StreamingSymmetricHashJoinExec physical operator is requested to process partitions of the left and right sides of a stream-stream join.
|
Retrieving Value Rows For Key — get Method
get(key: UnsafeRow): Iterator[UnsafeRow]
get simply requests the SymmetricHashJoinStateManager to retrieve value rows for the key.
|
Note
|
get is used exclusively when StreamingSymmetricHashJoinExec physical operator is requested to process partitions of the left and right sides of a stream-stream join.
|
Committing State (Changes) and Requesting Performance Metrics — commitStateAndGetMetrics Method
commitStateAndGetMetrics(): StateStoreMetrics
commitStateAndGetMetrics simply requests the SymmetricHashJoinStateManager to commit followed by requesting for the performance metrics.
|
Note
|
commitStateAndGetMetrics is used exclusively when StreamingSymmetricHashJoinExec physical operator is requested to process partitions of the left and right sides of a stream-stream join.
|
Internal Properties
| Name | Description |
|---|---|
|
Function to project (extract) join keys from an input row Used when…FIXME |
|
Used when…FIXME |
|
Predicate for late rows based on the stateWatermarkPredicate Used for the following:
|
|
Predicate for late rows based on the stateWatermarkPredicate Used for the following:
|