
SymmetricHashJoinStateManager is created for the left and right OneSideHashJoiners of a StreamingSymmetricHashJoinExec physical operator (one for each side when StreamingSymmetricHashJoinExec is requested to process partitions of the left and right sides of a stream-stream join).

Figure 1. SymmetricHashJoinStateManager and Stream-Stream Join

SymmetricHashJoinStateManager manages join state using the KeyToNumValuesStore and the KeyWithIndexToValueStore state store handlers (and simply acts like their facade).

Creating SymmetricHashJoinStateManager Instance

SymmetricHashJoinStateManager takes the following to be created:

SymmetricHashJoinStateManager initializes the internal properties.

KeyToNumValuesStore and KeyWithIndexToValueStore State Store Handlers — keyToNumValues and keyWithIndexToValue Internal Properties

SymmetricHashJoinStateManager uses a KeyToNumValuesStore (keyToNumValues) and a KeyWithIndexToValueStore (keyWithIndexToValue) internally that are created immediately when SymmetricHashJoinStateManager is created (for a OneSideHashJoiner).

keyToNumValues and keyWithIndexToValue are used when SymmetricHashJoinStateManager is requested for the following:

Join Side Marker — JoinSide Internal Enum

JoinSide can be one of the two possible values:

  • LeftSide (alias: left)

  • RightSide (alias: right)

They are both used exclusively when StreamingSymmetricHashJoinExec binary physical operator is requested to execute (and process partitions of the left and right sides of a stream-stream join with an OneSideHashJoiner).

Performance Metrics — metrics Method

metrics: StateStoreMetrics

metrics returns the combined StateStoreMetrics of the KeyToNumValuesStore and the KeyWithIndexToValueStore state store handlers.

metrics is used exclusively when OneSideHashJoiner is requested to commitStateAndGetMetrics.

removeByKeyCondition Method

  removalCondition: UnsafeRow => Boolean): Iterator[UnsafeRowPair]

removeByKeyCondition creates an Iterator of UnsafeRowPairs that removes keys (and associated values) for which the given removalCondition predicate holds.

removeByKeyCondition is used exclusively when OneSideHashJoiner is requested to remove an old state (for JoinStateKeyWatermarkPredicate).

getNext Internal Method (of removeByKeyCondition Method)

getNext(): UnsafeRowPair

getNext goes over the keys and values in the allKeyToNumValues sequence and removes keys (from the KeyToNumValuesStore) and the corresponding values (from the KeyWithIndexToValueStore) for which the given removalCondition predicate holds.

removeByValueCondition Method

  removalCondition: UnsafeRow => Boolean): Iterator[UnsafeRowPair]

removeByValueCondition creates an Iterator of UnsafeRowPairs that removes values (and associated keys if needed) for which the given removalCondition predicate holds.

removeByValueCondition is used exclusively when OneSideHashJoiner is requested to remove an old state (when JoinStateValueWatermarkPredicate is used).

getNext Internal Method (of removeByValueCondition Method)

getNext(): UnsafeRowPair


Appending New Value Row to Key — append Method

  key: UnsafeRow,
  value: UnsafeRow): Unit

In the end, append requests the stores for the following:

append is used exclusively when OneSideHashJoiner is requested to storeAndJoinWithOtherSide.

Retrieving Value Rows By Key — get Method

get(key: UnsafeRow): Iterator[UnsafeRow]

In the end, get requests the KeyWithIndexToValueStore to retrieve that number of value rows for the given key and leaves value rows only.

get is used when OneSideHashJoiner is requested to storeAndJoinWithOtherSide and retrieving value rows for a key.

Committing State (Changes) — commit Method

commit(): Unit

commit simply requests the keyToNumValues and keyWithIndexToValue state store handlers to commit state changes.

commit is used exclusively when OneSideHashJoiner is requested to commit state changes and get performance metrics.

Aborting State (Changes) — abortIfNeeded Method

abortIfNeeded(): Unit


abortIfNeeded is used when…​FIXME

allStateStoreNames Object Method

allStateStoreNames(joinSides: JoinSide*): Seq[String]

allStateStoreNames simply returns the names of the state stores for all possible combinations of the given JoinSides and the two possible store types (e.g. keyToNumValues and keyWithIndexToValue).

allStateStoreNames is used exclusively when StreamingSymmetricHashJoinExec physical operator is requested to execute and generate the runtime representation (as a RDD[InternalRow]).

getStateStoreName Object Method

  joinSide: JoinSide,
  storeType: StateStoreType): String

getStateStoreName simply returns a string of the following format:


getStateStoreName is used when:

updateNumValueForCurrentKey Internal Method

updateNumValueForCurrentKey(): Unit


updateNumValueForCurrentKey is used exclusively when SymmetricHashJoinStateManager is requested to removeByValueCondition.

Internal Properties

Name Description


Key attributes, i.e. AttributeReferences of the key schema

Used exclusively in KeyWithIndexToValueStore when requested for the keyWithIndexExprs, indexOrdinalInKeyWithIndexRow, keyWithIndexRowGenerator and keyRowGenerator


Key schema (StructType) based on the join keys with the names in the format of field and their ordinals (index)

Used when:

results matching ""

    No results matching ""