OneSideHashJoiner
OneSideHashJoiner
manages join state of one side of a stream-stream join (using SymmetricHashJoinStateManager).
OneSideHashJoiner
is created exclusively for StreamingSymmetricHashJoinExec physical operator (when requested to process partitions of the left and right sides of a stream-stream join).
StreamingSymmetricHashJoinExec
physical operator uses two OneSideHashJoiners
per side of the stream-stream join (left and right sides).
OneSideHashJoiner
uses an optional join state watermark predicate to remove old state.
Note
|
OneSideHashJoiner is a Scala private internal class of StreamingSymmetricHashJoinExec and so has full access to StreamingSymmetricHashJoinExec properties.
|
Creating OneSideHashJoiner Instance
OneSideHashJoiner
takes the following to be created:
OneSideHashJoiner
initializes the internal registries and counters.
SymmetricHashJoinStateManager — joinStateManager
Internal Property
joinStateManager: SymmetricHashJoinStateManager
joinStateManager
is a SymmetricHashJoinStateManager that is created for a OneSideHashJoiner
(with the join side, the input attributes, the join keys, and the StatefulOperatorStateInfo of the owning StreamingSymmetricHashJoinExec).
joinStateManager
is used when OneSideHashJoiner
is requested for the following:
Number of Updated State Rows — updatedStateRowsCount
Internal Counter
updatedStateRowsCount
is the number the join keys and associated rows that were persisted as a join state, i.e. how many times storeAndJoinWithOtherSide requested the SymmetricHashJoinStateManager to append the join key and the input row (to a join state).
updatedStateRowsCount
is then used (via numUpdatedStateRows method) for the numUpdatedStateRows performance metric.
updatedStateRowsCount
is available via numUpdatedStateRows
method.
numUpdatedStateRows: Long
Note
|
numUpdatedStateRows is used exclusively when StreamingSymmetricHashJoinExec physical operator is requested to [spark-sql-streaming-StreamingSymmetricHashJoinExec#processPartitions process partitions of the left and right sides of a stream-stream join] (and completes).
|
Optional Join State Watermark Predicate — stateWatermarkPredicate
Internal Property
stateWatermarkPredicate: Option[JoinStateWatermarkPredicate]
When created, OneSideHashJoiner
is given a JoinStateWatermarkPredicate.
stateWatermarkPredicate
is used for the stateKeyWatermarkPredicateFunc (when a JoinStateKeyWatermarkPredicate) and the stateValueWatermarkPredicateFunc (when a JoinStateValueWatermarkPredicate) that are both used when OneSideHashJoiner
is requested to removeOldState.
storeAndJoinWithOtherSide
Method
storeAndJoinWithOtherSide(
otherSideJoiner: OneSideHashJoiner)(
generateJoinedRow: (InternalRow, InternalRow) => JoinedRow): Iterator[InternalRow]
storeAndJoinWithOtherSide
tries to find the watermark attribute among the input attributes.
storeAndJoinWithOtherSide
creates a watermark expression (for the watermark attribute and the current event-time watermark).
With the watermark attribute found, storeAndJoinWithOtherSide
generates a new predicate for the watermark expression and the input attributes that is then used to filter out (exclude) late rows from the input. Otherwise, the input rows are left unchanged (i.e. no rows are considered late and excluded).
For every input row (possibly watermarked), storeAndJoinWithOtherSide
applies the preJoinFilter predicate and branches off per result (true or false).
Note
|
storeAndJoinWithOtherSide is used when StreamingSymmetricHashJoinExec physical operator is requested to process partitions of the left and right sides of a stream-stream join.
|
preJoinFilter
Predicate Positive (true
)
When the preJoinFilter predicate succeeds on an input row, storeAndJoinWithOtherSide
extracts the join key (using the keyGenerator) and requests the given OneSideHashJoiner
(otherSideJoiner
) for the SymmetricHashJoinStateManager that is in turn requested for the state values for the extracted join key. The values are then processed (mapped over) using the given generateJoinedRow
function and then filtered by the post-join filter.
storeAndJoinWithOtherSide
uses the stateKeyWatermarkPredicateFunc (on the extracted join key) and the stateValueWatermarkPredicateFunc (on the current input row) to determine whether to request the SymmetricHashJoinStateManager to append the key and the input row (to a join state). If so, storeAndJoinWithOtherSide
increments the updatedStateRowsCount counter.
preJoinFilter
Predicate Negative (false
)
When the preJoinFilter predicate fails on an input row, storeAndJoinWithOtherSide
creates a new Iterator[InternalRow]
of joined rows per join side and type:
-
For LeftSide and
LeftOuter
, the join row is the current row with the values of the right side allnull
(nullRight
) -
For RightSide and
RightOuter
, the join row is the current row with the values of the left side allnull
(nullLeft
) -
For all other combinations, the iterator is simply empty (that will be removed from the output by the outer nonLateRows.flatMap).
Removing Old State — removeOldState
Method
removeOldState(): Iterator[UnsafeRowPair]
removeOldState
branches off per the JoinStateWatermarkPredicate:
-
For JoinStateKeyWatermarkPredicate,
removeOldState
requests the SymmetricHashJoinStateManager to removeByKeyCondition (with the stateKeyWatermarkPredicateFunc) -
For JoinStateValueWatermarkPredicate,
removeOldState
requests the SymmetricHashJoinStateManager to removeByValueCondition (with the stateValueWatermarkPredicateFunc) -
For any other predicates,
removeOldState
returns an empty iterator (no rows to process)
Note
|
removeOldState is used exclusively when StreamingSymmetricHashJoinExec physical operator is requested to process partitions of the left and right sides of a stream-stream join.
|
Retrieving Value Rows For Key — get
Method
get(key: UnsafeRow): Iterator[UnsafeRow]
get
simply requests the SymmetricHashJoinStateManager to retrieve value rows for the key.
Note
|
get is used exclusively when StreamingSymmetricHashJoinExec physical operator is requested to process partitions of the left and right sides of a stream-stream join.
|
Committing State (Changes) and Requesting Performance Metrics — commitStateAndGetMetrics
Method
commitStateAndGetMetrics(): StateStoreMetrics
commitStateAndGetMetrics
simply requests the SymmetricHashJoinStateManager to commit followed by requesting for the performance metrics.
Note
|
commitStateAndGetMetrics is used exclusively when StreamingSymmetricHashJoinExec physical operator is requested to process partitions of the left and right sides of a stream-stream join.
|
Internal Properties
Name | Description |
---|---|
|
Function to project (extract) join keys from an input row Used when…FIXME |
|
Used when…FIXME |
|
Predicate for late rows based on the stateWatermarkPredicate Used for the following:
|
|
Predicate for late rows based on the stateWatermarkPredicate Used for the following:
|