SymmetricHashJoinStateManager

SymmetricHashJoinStateManager is created for the left and right OneSideHashJoiners of a StreamingSymmetricHashJoinExec physical operator (one for each side when StreamingSymmetricHashJoinExec is requested to process partitions of the left and right sides of a stream-stream join).

SymmetricHashJoinStateManager.png
Figure 1. SymmetricHashJoinStateManager and Stream-Stream Join

SymmetricHashJoinStateManager manages join state using the KeyToNumValuesStore and the KeyWithIndexToValueStore state store handlers (and simply acts like their facade).

Creating SymmetricHashJoinStateManager Instance

SymmetricHashJoinStateManager takes the following to be created:

SymmetricHashJoinStateManager initializes the internal properties.

KeyToNumValuesStore and KeyWithIndexToValueStore State Store Handlers — keyToNumValues and keyWithIndexToValue Internal Properties

SymmetricHashJoinStateManager uses a KeyToNumValuesStore (keyToNumValues) and a KeyWithIndexToValueStore (keyWithIndexToValue) internally that are created immediately when SymmetricHashJoinStateManager is created (for a OneSideHashJoiner).

keyToNumValues and keyWithIndexToValue are used when SymmetricHashJoinStateManager is requested for the following:

Join Side Marker — JoinSide Internal Enum

JoinSide can be one of the two possible values:

  • LeftSide (alias: left)

  • RightSide (alias: right)

They are both used exclusively when StreamingSymmetricHashJoinExec binary physical operator is requested to execute (and process partitions of the left and right sides of a stream-stream join with an OneSideHashJoiner).

Performance Metrics — metrics Method

metrics: StateStoreMetrics

metrics returns the combined StateStoreMetrics of the KeyToNumValuesStore and the KeyWithIndexToValueStore state store handlers.

Note
metrics is used exclusively when OneSideHashJoiner is requested to commitStateAndGetMetrics.

removeByKeyCondition Method

removeByKeyCondition(
  removalCondition: UnsafeRow => Boolean): Iterator[UnsafeRowPair]

removeByKeyCondition creates an Iterator of UnsafeRowPairs that removes keys (and associated values) for which the given removalCondition predicate holds.

Note
removeByKeyCondition is used exclusively when OneSideHashJoiner is requested to remove an old state (for JoinStateKeyWatermarkPredicate).

getNext Internal Method (of removeByKeyCondition Method)

getNext(): UnsafeRowPair

getNext goes over the keys and values in the allKeyToNumValues sequence and removes keys (from the KeyToNumValuesStore) and the corresponding values (from the KeyWithIndexToValueStore) for which the given removalCondition predicate holds.

removeByValueCondition Method

removeByValueCondition(
  removalCondition: UnsafeRow => Boolean): Iterator[UnsafeRowPair]

removeByValueCondition creates an Iterator of UnsafeRowPairs that removes values (and associated keys if needed) for which the given removalCondition predicate holds.

Note
removeByValueCondition is used exclusively when OneSideHashJoiner is requested to remove an old state (when JoinStateValueWatermarkPredicate is used).

getNext Internal Method (of removeByValueCondition Method)

getNext(): UnsafeRowPair

getNext…​FIXME

Appending New Value Row to Key — append Method

append(
  key: UnsafeRow,
  value: UnsafeRow): Unit

In the end, append requests the stores for the following:

Note
append is used exclusively when OneSideHashJoiner is requested to storeAndJoinWithOtherSide.

Retrieving Value Rows By Key — get Method

get(key: UnsafeRow): Iterator[UnsafeRow]

In the end, get requests the KeyWithIndexToValueStore to retrieve that number of value rows for the given key and leaves value rows only.

Note
get is used when OneSideHashJoiner is requested to storeAndJoinWithOtherSide and retrieving value rows for a key.

Committing State (Changes) — commit Method

commit(): Unit

commit simply requests the keyToNumValues and keyWithIndexToValue state store handlers to commit state changes.

Note
commit is used exclusively when OneSideHashJoiner is requested to commit state changes and get performance metrics.

Aborting State (Changes) — abortIfNeeded Method

abortIfNeeded(): Unit

abortIfNeeded…​FIXME

Note
abortIfNeeded is used when…​FIXME

allStateStoreNames Object Method

allStateStoreNames(joinSides: JoinSide*): Seq[String]

allStateStoreNames simply returns the names of the state stores for all possible combinations of the given JoinSides and the two possible store types (e.g. keyToNumValues and keyWithIndexToValue).

Note
allStateStoreNames is used exclusively when StreamingSymmetricHashJoinExec physical operator is requested to execute and generate the runtime representation (as a RDD[InternalRow]).

getStateStoreName Object Method

getStateStoreName(
  joinSide: JoinSide,
  storeType: StateStoreType): String

getStateStoreName simply returns a string of the following format:

[joinSide]-[storeType]
Note

getStateStoreName is used when:

updateNumValueForCurrentKey Internal Method

updateNumValueForCurrentKey(): Unit

updateNumValueForCurrentKey…​FIXME

Note
updateNumValueForCurrentKey is used exclusively when SymmetricHashJoinStateManager is requested to removeByValueCondition.

Internal Properties

Name Description

keyAttributes

Key attributes, i.e. AttributeReferences of the key schema

Used exclusively in KeyWithIndexToValueStore when requested for the keyWithIndexExprs, indexOrdinalInKeyWithIndexRow, keyWithIndexRowGenerator and keyRowGenerator

keySchema

Key schema (StructType) based on the join keys with the names in the format of field and their ordinals (index)

Used when:

results matching ""

    No results matching ""