SymmetricHashJoinStateManager
SymmetricHashJoinStateManager is created for the left and right OneSideHashJoiners of a StreamingSymmetricHashJoinExec physical operator (one for each side when StreamingSymmetricHashJoinExec is requested to process partitions of the left and right sides of a stream-stream join).
SymmetricHashJoinStateManager manages join state using the KeyToNumValuesStore and the KeyWithIndexToValueStore state store handlers (and simply acts like their facade).
Creating SymmetricHashJoinStateManager Instance
SymmetricHashJoinStateManager takes the following to be created:
-
Hadoop Configuration
SymmetricHashJoinStateManager initializes the internal properties.
KeyToNumValuesStore and KeyWithIndexToValueStore State Store Handlers — keyToNumValues and keyWithIndexToValue Internal Properties
SymmetricHashJoinStateManager uses a KeyToNumValuesStore (keyToNumValues) and a KeyWithIndexToValueStore (keyWithIndexToValue) internally that are created immediately when SymmetricHashJoinStateManager is created (for a OneSideHashJoiner).
keyToNumValues and keyWithIndexToValue are used when SymmetricHashJoinStateManager is requested for the following:
Join Side Marker — JoinSide Internal Enum
JoinSide can be one of the two possible values:
They are both used exclusively when StreamingSymmetricHashJoinExec binary physical operator is requested to execute (and process partitions of the left and right sides of a stream-stream join with an OneSideHashJoiner).
Performance Metrics — metrics Method
metrics: StateStoreMetrics
metrics returns the combined StateStoreMetrics of the KeyToNumValuesStore and the KeyWithIndexToValueStore state store handlers.
|
Note
|
metrics is used exclusively when OneSideHashJoiner is requested to commitStateAndGetMetrics.
|
removeByKeyCondition Method
removeByKeyCondition(
removalCondition: UnsafeRow => Boolean): Iterator[UnsafeRowPair]
removeByKeyCondition creates an Iterator of UnsafeRowPairs that removes keys (and associated values) for which the given removalCondition predicate holds.
removeByKeyCondition uses the KeyToNumValuesStore for all state keys and values (in the underlying state store).
|
Note
|
removeByKeyCondition is used exclusively when OneSideHashJoiner is requested to remove an old state (for JoinStateKeyWatermarkPredicate).
|
getNext Internal Method (of removeByKeyCondition Method)
getNext(): UnsafeRowPair
getNext goes over the keys and values in the allKeyToNumValues sequence and removes keys (from the KeyToNumValuesStore) and the corresponding values (from the KeyWithIndexToValueStore) for which the given removalCondition predicate holds.
removeByValueCondition Method
removeByValueCondition(
removalCondition: UnsafeRow => Boolean): Iterator[UnsafeRowPair]
removeByValueCondition creates an Iterator of UnsafeRowPairs that removes values (and associated keys if needed) for which the given removalCondition predicate holds.
|
Note
|
removeByValueCondition is used exclusively when OneSideHashJoiner is requested to remove an old state (when JoinStateValueWatermarkPredicate is used).
|
Appending New Value Row to Key — append Method
append(
key: UnsafeRow,
value: UnsafeRow): Unit
append requests the KeyToNumValuesStore for the number of value rows for the given key.
In the end, append requests the stores for the following:
|
Note
|
append is used exclusively when OneSideHashJoiner is requested to storeAndJoinWithOtherSide.
|
Retrieving Value Rows By Key — get Method
get(key: UnsafeRow): Iterator[UnsafeRow]
get requests the KeyToNumValuesStore for the number of value rows for the given key.
In the end, get requests the KeyWithIndexToValueStore to retrieve that number of value rows for the given key and leaves value rows only.
|
Note
|
get is used when OneSideHashJoiner is requested to storeAndJoinWithOtherSide and retrieving value rows for a key.
|
Committing State (Changes) — commit Method
commit(): Unit
commit simply requests the keyToNumValues and keyWithIndexToValue state store handlers to commit state changes.
|
Note
|
commit is used exclusively when OneSideHashJoiner is requested to commit state changes and get performance metrics.
|
Aborting State (Changes) — abortIfNeeded Method
abortIfNeeded(): Unit
abortIfNeeded…FIXME
|
Note
|
abortIfNeeded is used when…FIXME
|
allStateStoreNames Object Method
allStateStoreNames(joinSides: JoinSide*): Seq[String]
allStateStoreNames simply returns the names of the state stores for all possible combinations of the given JoinSides and the two possible store types (e.g. keyToNumValues and keyWithIndexToValue).
|
Note
|
allStateStoreNames is used exclusively when StreamingSymmetricHashJoinExec physical operator is requested to execute and generate the runtime representation (as a RDD[InternalRow]).
|
getStateStoreName Object Method
getStateStoreName(
joinSide: JoinSide,
storeType: StateStoreType): String
getStateStoreName simply returns a string of the following format:
[joinSide]-[storeType]
|
Note
|
|
updateNumValueForCurrentKey Internal Method
updateNumValueForCurrentKey(): Unit
updateNumValueForCurrentKey…FIXME
|
Note
|
updateNumValueForCurrentKey is used exclusively when SymmetricHashJoinStateManager is requested to removeByValueCondition.
|
Internal Properties
| Name | Description |
|---|---|
|
Key attributes, i.e. Used exclusively in |
|
Key schema ( Used when:
|