SymmetricHashJoinStateManager
SymmetricHashJoinStateManager
is created for the left and right OneSideHashJoiners of a StreamingSymmetricHashJoinExec physical operator (one for each side when StreamingSymmetricHashJoinExec
is requested to process partitions of the left and right sides of a stream-stream join).
SymmetricHashJoinStateManager
manages join state using the KeyToNumValuesStore and the KeyWithIndexToValueStore state store handlers (and simply acts like their facade).
Creating SymmetricHashJoinStateManager Instance
SymmetricHashJoinStateManager
takes the following to be created:
-
Hadoop Configuration
SymmetricHashJoinStateManager
initializes the internal properties.
KeyToNumValuesStore and KeyWithIndexToValueStore State Store Handlers — keyToNumValues
and keyWithIndexToValue
Internal Properties
SymmetricHashJoinStateManager
uses a KeyToNumValuesStore (keyToNumValues
) and a KeyWithIndexToValueStore (keyWithIndexToValue
) internally that are created immediately when SymmetricHashJoinStateManager
is created (for a OneSideHashJoiner).
keyToNumValues
and keyWithIndexToValue
are used when SymmetricHashJoinStateManager
is requested for the following:
Join Side Marker — JoinSide
Internal Enum
JoinSide
can be one of the two possible values:
They are both used exclusively when StreamingSymmetricHashJoinExec
binary physical operator is requested to execute (and process partitions of the left and right sides of a stream-stream join with an OneSideHashJoiner).
Performance Metrics — metrics
Method
metrics: StateStoreMetrics
metrics
returns the combined StateStoreMetrics of the KeyToNumValuesStore and the KeyWithIndexToValueStore state store handlers.
Note
|
metrics is used exclusively when OneSideHashJoiner is requested to commitStateAndGetMetrics.
|
removeByKeyCondition
Method
removeByKeyCondition(
removalCondition: UnsafeRow => Boolean): Iterator[UnsafeRowPair]
removeByKeyCondition
creates an Iterator
of UnsafeRowPairs
that removes keys (and associated values) for which the given removalCondition
predicate holds.
removeByKeyCondition
uses the KeyToNumValuesStore for all state keys and values (in the underlying state store).
Note
|
removeByKeyCondition is used exclusively when OneSideHashJoiner is requested to remove an old state (for JoinStateKeyWatermarkPredicate).
|
getNext
Internal Method (of removeByKeyCondition
Method)
getNext(): UnsafeRowPair
getNext
goes over the keys and values in the allKeyToNumValues sequence and removes keys (from the KeyToNumValuesStore) and the corresponding values (from the KeyWithIndexToValueStore) for which the given removalCondition
predicate holds.
removeByValueCondition
Method
removeByValueCondition(
removalCondition: UnsafeRow => Boolean): Iterator[UnsafeRowPair]
removeByValueCondition
creates an Iterator
of UnsafeRowPairs
that removes values (and associated keys if needed) for which the given removalCondition
predicate holds.
Note
|
removeByValueCondition is used exclusively when OneSideHashJoiner is requested to remove an old state (when JoinStateValueWatermarkPredicate is used).
|
Appending New Value Row to Key — append
Method
append(
key: UnsafeRow,
value: UnsafeRow): Unit
append
requests the KeyToNumValuesStore for the number of value rows for the given key.
In the end, append
requests the stores for the following:
Note
|
append is used exclusively when OneSideHashJoiner is requested to storeAndJoinWithOtherSide.
|
Retrieving Value Rows By Key — get
Method
get(key: UnsafeRow): Iterator[UnsafeRow]
get
requests the KeyToNumValuesStore for the number of value rows for the given key.
In the end, get
requests the KeyWithIndexToValueStore to retrieve that number of value rows for the given key and leaves value rows only.
Note
|
get is used when OneSideHashJoiner is requested to storeAndJoinWithOtherSide and retrieving value rows for a key.
|
Committing State (Changes) — commit
Method
commit(): Unit
commit
simply requests the keyToNumValues and keyWithIndexToValue state store handlers to commit state changes.
Note
|
commit is used exclusively when OneSideHashJoiner is requested to commit state changes and get performance metrics.
|
Aborting State (Changes) — abortIfNeeded
Method
abortIfNeeded(): Unit
abortIfNeeded
…FIXME
Note
|
abortIfNeeded is used when…FIXME
|
allStateStoreNames
Object Method
allStateStoreNames(joinSides: JoinSide*): Seq[String]
allStateStoreNames
simply returns the names of the state stores for all possible combinations of the given JoinSides
and the two possible store types (e.g. keyToNumValues and keyWithIndexToValue).
Note
|
allStateStoreNames is used exclusively when StreamingSymmetricHashJoinExec physical operator is requested to execute and generate the runtime representation (as a RDD[InternalRow] ).
|
getStateStoreName
Object Method
getStateStoreName(
joinSide: JoinSide,
storeType: StateStoreType): String
getStateStoreName
simply returns a string of the following format:
[joinSide]-[storeType]
Note
|
|
updateNumValueForCurrentKey
Internal Method
updateNumValueForCurrentKey(): Unit
updateNumValueForCurrentKey
…FIXME
Note
|
updateNumValueForCurrentKey is used exclusively when SymmetricHashJoinStateManager is requested to removeByValueCondition.
|
Internal Properties
Name | Description |
---|---|
|
Key attributes, i.e. Used exclusively in |
|
Key schema ( Used when:
|