 
SymmetricHashJoinStateManager
SymmetricHashJoinStateManager is created for the left and right OneSideHashJoiners of a StreamingSymmetricHashJoinExec physical operator (one for each side when StreamingSymmetricHashJoinExec is requested to process partitions of the left and right sides of a stream-stream join).
 
SymmetricHashJoinStateManager manages join state using the KeyToNumValuesStore and the KeyWithIndexToValueStore state store handlers (and simply acts like their facade).
Creating SymmetricHashJoinStateManager Instance
SymmetricHashJoinStateManager takes the following to be created:
- 
Hadoop Configuration 
SymmetricHashJoinStateManager initializes the internal properties.
 KeyToNumValuesStore and KeyWithIndexToValueStore State Store Handlers — keyToNumValues and keyWithIndexToValue Internal Properties
SymmetricHashJoinStateManager uses a KeyToNumValuesStore (keyToNumValues) and a KeyWithIndexToValueStore (keyWithIndexToValue) internally that are created immediately when SymmetricHashJoinStateManager is created (for a OneSideHashJoiner).
keyToNumValues and keyWithIndexToValue are used when SymmetricHashJoinStateManager is requested for the following:
 Join Side Marker — JoinSide Internal Enum
JoinSide can be one of the two possible values:
They are both used exclusively when StreamingSymmetricHashJoinExec binary physical operator is requested to execute (and process partitions of the left and right sides of a stream-stream join with an OneSideHashJoiner).
 Performance Metrics — metrics Method
metrics: StateStoreMetricsmetrics returns the combined StateStoreMetrics of the KeyToNumValuesStore and the KeyWithIndexToValueStore state store handlers.
| Note | metricsis used exclusively whenOneSideHashJoineris requested to commitStateAndGetMetrics. | 
 removeByKeyCondition Method
removeByKeyCondition(
  removalCondition: UnsafeRow => Boolean): Iterator[UnsafeRowPair]removeByKeyCondition creates an Iterator of UnsafeRowPairs that removes keys (and associated values) for which the given removalCondition predicate holds.
removeByKeyCondition uses the KeyToNumValuesStore for all state keys and values (in the underlying state store).
| Note | removeByKeyConditionis used exclusively whenOneSideHashJoineris requested to remove an old state (for JoinStateKeyWatermarkPredicate). | 
 getNext Internal Method (of removeByKeyCondition Method)
getNext(): UnsafeRowPairgetNext goes over the keys and values in the allKeyToNumValues sequence and removes keys (from the KeyToNumValuesStore) and the corresponding values (from the KeyWithIndexToValueStore) for which the given removalCondition predicate holds.
 removeByValueCondition Method
removeByValueCondition(
  removalCondition: UnsafeRow => Boolean): Iterator[UnsafeRowPair]removeByValueCondition creates an Iterator of UnsafeRowPairs that removes values (and associated keys if needed) for which the given removalCondition predicate holds.
| Note | removeByValueConditionis used exclusively whenOneSideHashJoineris requested to remove an old state (when JoinStateValueWatermarkPredicate is used). | 
 Appending New Value Row to Key — append Method
append(
  key: UnsafeRow,
  value: UnsafeRow): Unitappend requests the KeyToNumValuesStore for the number of value rows for the given key.
In the end, append requests the stores for the following:
| Note | appendis used exclusively whenOneSideHashJoineris requested to storeAndJoinWithOtherSide. | 
 Retrieving Value Rows By Key — get Method
get(key: UnsafeRow): Iterator[UnsafeRow]get requests the KeyToNumValuesStore for the number of value rows for the given key.
In the end, get requests the KeyWithIndexToValueStore to retrieve that number of value rows for the given key and leaves value rows only.
| Note | getis used whenOneSideHashJoineris requested to storeAndJoinWithOtherSide and retrieving value rows for a key. | 
 Committing State (Changes) — commit Method
commit(): Unitcommit simply requests the keyToNumValues and keyWithIndexToValue state store handlers to commit state changes.
| Note | commitis used exclusively whenOneSideHashJoineris requested to commit state changes and get performance metrics. | 
 Aborting State (Changes) — abortIfNeeded Method
abortIfNeeded(): UnitabortIfNeeded…FIXME
| Note | abortIfNeededis used when…FIXME | 
 allStateStoreNames Object Method
allStateStoreNames(joinSides: JoinSide*): Seq[String]allStateStoreNames simply returns the names of the state stores for all possible combinations of the given JoinSides and the two possible store types (e.g. keyToNumValues and keyWithIndexToValue).
| Note | allStateStoreNamesis used exclusively whenStreamingSymmetricHashJoinExecphysical operator is requested to execute and generate the runtime representation (as aRDD[InternalRow]). | 
 getStateStoreName Object Method
getStateStoreName(
  joinSide: JoinSide,
  storeType: StateStoreType): StringgetStateStoreName simply returns a string of the following format:
[joinSide]-[storeType]| Note | 
 
 | 
 updateNumValueForCurrentKey Internal Method
updateNumValueForCurrentKey(): UnitupdateNumValueForCurrentKey…FIXME
| Note | updateNumValueForCurrentKeyis used exclusively whenSymmetricHashJoinStateManageris requested to removeByValueCondition. | 
Internal Properties
| Name | Description | 
|---|---|
| 
 |  Key attributes, i.e.  Used exclusively in  | 
| 
 |  Key schema ( Used when: 
 |