OneSideHashJoiner

OneSideHashJoiner manages join state of one side of a stream-stream join (using SymmetricHashJoinStateManager).

OneSideHashJoiner is created exclusively for StreamingSymmetricHashJoinExec physical operator (when requested to process partitions of the left and right sides of a stream-stream join).

OneSideHashJoiner.png
Figure 1. OneSideHashJoiner and StreamingSymmetricHashJoinExec

StreamingSymmetricHashJoinExec physical operator uses two OneSideHashJoiners per side of the stream-stream join (left and right sides).

Note
OneSideHashJoiner is a Scala private internal class of StreamingSymmetricHashJoinExec and so has full access to StreamingSymmetricHashJoinExec properties.

Creating OneSideHashJoiner Instance

OneSideHashJoiner takes the following to be created:

  • JoinSide

  • Input attributes (Seq[Attribute])

  • Join keys (Seq[Expression])

  • Input rows (Iterator[InternalRow])

  • Optional pre-join filter Catalyst expression

  • Post-join filter ((InternalRow) ⇒ Boolean)

  • JoinStateWatermarkPredicate

OneSideHashJoiner initializes the internal registries and counters.

SymmetricHashJoinStateManager — joinStateManager Internal Property

joinStateManager: SymmetricHashJoinStateManager

joinStateManager is a SymmetricHashJoinStateManager that is created for a OneSideHashJoiner (with the join side, the input attributes, the join keys, and the StatefulOperatorStateInfo of the owning StreamingSymmetricHashJoinExec).

joinStateManager is used when OneSideHashJoiner is requested for the following:

Number of Updated State Rows — updatedStateRowsCount Internal Counter

updatedStateRowsCount is the number the join keys and associated rows that were persisted as a join state, i.e. how many times storeAndJoinWithOtherSide requested the SymmetricHashJoinStateManager to append the join key and the input row (to a join state).

updatedStateRowsCount is then used (via numUpdatedStateRows method) for the numUpdatedStateRows performance metric.

updatedStateRowsCount is available via numUpdatedStateRows method.

numUpdatedStateRows: Long
Note
numUpdatedStateRows is used exclusively when StreamingSymmetricHashJoinExec physical operator is requested to [spark-sql-streaming-StreamingSymmetricHashJoinExec#processPartitions process partitions of the left and right sides of a stream-stream join] (and completes).

Optional Join State Watermark Predicate — stateWatermarkPredicate Internal Property

stateWatermarkPredicate: Option[JoinStateWatermarkPredicate]

When created, OneSideHashJoiner is given a JoinStateWatermarkPredicate.

stateWatermarkPredicate is used for the stateKeyWatermarkPredicateFunc (when a JoinStateKeyWatermarkPredicate) and the stateValueWatermarkPredicateFunc (when a JoinStateValueWatermarkPredicate) that are both used when OneSideHashJoiner is requested to removeOldState.

storeAndJoinWithOtherSide Method

storeAndJoinWithOtherSide(
  otherSideJoiner: OneSideHashJoiner)(
  generateJoinedRow: (InternalRow, InternalRow) => JoinedRow): Iterator[InternalRow]

storeAndJoinWithOtherSide tries to find the watermark attribute among the input attributes.

storeAndJoinWithOtherSide creates a watermark expression (for the watermark attribute and the current event-time watermark).

With the watermark attribute found, storeAndJoinWithOtherSide generates a new predicate for the watermark expression and the input attributes that is then used to filter out (exclude) late rows from the input. Otherwise, the input rows are left unchanged (i.e. no rows are considered late and excluded).

For every input row (possibly watermarked), storeAndJoinWithOtherSide applies the preJoinFilter predicate and branches off per result (true or false).

Note
storeAndJoinWithOtherSide is used when StreamingSymmetricHashJoinExec physical operator is requested to process partitions of the left and right sides of a stream-stream join.

preJoinFilter Predicate Positive (true)

When the preJoinFilter predicate succeeds on an input row, storeAndJoinWithOtherSide extracts the join key (using the keyGenerator) and requests the given OneSideHashJoiner (otherSideJoiner) for the SymmetricHashJoinStateManager that is in turn requested for the state values for the extracted join key. The values are then processed (mapped over) using the given generateJoinedRow function and then filtered by the post-join filter.

storeAndJoinWithOtherSide uses the stateKeyWatermarkPredicateFunc (on the extracted join key) and the stateValueWatermarkPredicateFunc (on the current input row) to determine whether to request the SymmetricHashJoinStateManager to append the key and the input row (to a join state). If so, storeAndJoinWithOtherSide increments the updatedStateRowsCount counter.

preJoinFilter Predicate Negative (false)

When the preJoinFilter predicate fails on an input row, storeAndJoinWithOtherSide creates a new Iterator[InternalRow] of joined rows per join side and type:

  • For LeftSide and LeftOuter, the join row is the current row with the values of the right side all null (nullRight)

  • For RightSide and RightOuter, the join row is the current row with the values of the left side all null (nullLeft)

  • For all other combinations, the iterator is simply empty (that will be removed from the output by the outer nonLateRows.flatMap).

Removing Old State — removeOldState Method

removeOldState(): Iterator[UnsafeRowPair]

removeOldState branches off per the JoinStateWatermarkPredicate:

Note
removeOldState is used exclusively when StreamingSymmetricHashJoinExec physical operator is requested to process partitions of the left and right sides of a stream-stream join.

Retrieving Value Rows For Key — get Method

get(key: UnsafeRow): Iterator[UnsafeRow]
Note
get is used exclusively when StreamingSymmetricHashJoinExec physical operator is requested to process partitions of the left and right sides of a stream-stream join.

Committing State (Changes) and Requesting Performance Metrics — commitStateAndGetMetrics Method

commitStateAndGetMetrics(): StateStoreMetrics

commitStateAndGetMetrics simply requests the SymmetricHashJoinStateManager to commit followed by requesting for the performance metrics.

Note
commitStateAndGetMetrics is used exclusively when StreamingSymmetricHashJoinExec physical operator is requested to process partitions of the left and right sides of a stream-stream join.

Internal Properties

Name Description

keyGenerator

keyGenerator: UnsafeProjection

Function to project (extract) join keys from an input row

Used when…​FIXME

preJoinFilter

preJoinFilter: InternalRow => Boolean

Used when…​FIXME

stateKeyWatermarkPredicateFunc

stateKeyWatermarkPredicateFunc: InternalRow => Boolean

Predicate for late rows based on the stateWatermarkPredicate

Used for the following:

stateValueWatermarkPredicateFunc

stateValueWatermarkPredicateFunc: InternalRow => Boolean

Predicate for late rows based on the stateWatermarkPredicate

Used for the following:

results matching ""

    No results matching ""