StreamingSymmetricHashJoinExec Binary Physical Operator — Stream-Stream Joins

StreamingSymmetricHashJoinExec is a binary physical operator that represents a stream-stream equi-join at execution time.

Note

A binary physical operator (BinaryExecNode) is a physical operator with left and right child physical operators.

Read up on BinaryExecNode (and physical operators in general) in The Internals of Spark SQL online book.

StreamingSymmetricHashJoinExec supports Inner, LeftOuter, and RightOuter join types (with the left and the right keys using the exact same data types).

StreamingSymmetricHashJoinExec is created exclusively when StreamingJoinStrategy execution planning strategy is requested to plan a logical query plan with a Join logical operator of two streaming queries with equality predicates (EqualTo and EqualNullSafe).

StreamingSymmetricHashJoinExec is given execution-specific configuration (i.e. StatefulOperatorStateInfo, event-time watermark, and JoinStateWatermarkPredicates) when IncrementalExecution is requested to plan a streaming query for execution (and uses the state preparation rule).

StreamingSymmetricHashJoinExec uses two OneSideHashJoiners (for the left and right sides of the join) to manage join state when processing partitions of the left and right sides of a stream-stream join.

StreamingSymmetricHashJoinExec is a stateful physical operator that writes to a state store.

Creating StreamingSymmetricHashJoinExec Instance

StreamingSymmetricHashJoinExec takes the following to be created:

StreamingSymmetricHashJoinExec initializes the internal properties.

Output Schema — output Method

output: Seq[Attribute]
Note
output is part of the QueryPlan Contract to describe the attributes of (the schema of) the output.

output schema depends on the join type:

  • For Cross and Inner (InnerLike) joins, it is the output schema of the left and right operators

  • For LeftOuter joins, it is the output schema of the left operator with the attributes of the right operator with nullability flag enabled (true)

  • For RightOuter joins, it is the output schema of the right operator with the attributes of the left operator with nullability flag enabled (true)

output throws an IllegalArgumentException for other join types:

[className] should not take [joinType] as the JoinType

Output Partitioning — outputPartitioning Method

outputPartitioning: Partitioning
Note
outputPartitioning is part of the SparkPlan Contract to specify how data should be partitioned across different nodes in the cluster.

outputPartitioning depends on the join type:

  • For Cross and Inner (InnerLike) joins, it is a PartitioningCollection of the output partitioning of the left and right operators

  • For LeftOuter joins, it is a PartitioningCollection of the output partitioning of the left operator

  • For RightOuter joins, it is a PartitioningCollection of the output partitioning of the right operator

outputPartitioning throws an IllegalArgumentException for other join types:

[className] should not take [joinType] as the JoinType

Event-Time Watermark — eventTimeWatermark Internal Property

eventTimeWatermark: Option[Long]

When created, StreamingSymmetricHashJoinExec can be given the event-time watermark of the current streaming micro-batch.

eventTimeWatermark is an optional property that is specified only after IncrementalExecution was requested to apply the state preparation rule to a physical query plan of a streaming query (to optimize (prepare) the physical plan of the streaming query once for ContinuousExecution and every trigger for MicroBatchExecution in their queryPlanning phases).

Note

eventTimeWatermark is used when:

Watermark Predicates for State Removal — stateWatermarkPredicates Internal Property

stateWatermarkPredicates: JoinStateWatermarkPredicates

When created, StreamingSymmetricHashJoinExec is given a JoinStateWatermarkPredicates for the left and right join sides (using the StreamingSymmetricHashJoinHelper utility).

stateWatermarkPredicates contains the left and right predicates only when IncrementalExecution is requested to apply the state preparation rule to a physical query plan of a streaming query (to optimize (prepare) the physical plan of the streaming query once for ContinuousExecution and every trigger for MicroBatchExecution in their queryPlanning phases).

Note

stateWatermarkPredicates is used when StreamingSymmetricHashJoinExec is requested for the following:

Required Partition Requirements — requiredChildDistribution Method

requiredChildDistribution: Seq[Distribution]
Note

requiredChildDistribution is part of the SparkPlan Contract for the required partition requirements (aka required child distribution) of the input data, i.e. how the output of the children physical operators is split across partitions before this operator can be executed.

Read up on SparkPlan Contract in The Internals of Spark SQL online book.

requiredChildDistribution returns two HashClusteredDistributions for the left and right keys with the required number of partitions based on the StatefulOperatorStateInfo.

Note

requiredChildDistribution is used exclusively when EnsureRequirements physical query plan optimization is executed (and enforces partition requirements).

Note

HashClusteredDistribution becomes HashPartitioning at execution that distributes rows across partitions (generates partition IDs of rows) based on Murmur3Hash of the join expressions (separately for the left and right keys) modulo the required number of partitions.

Performance Metrics (SQLMetrics)

StreamingSymmetricHashJoinExec uses the performance metrics as other stateful physical operators that write to a state store.

StreamingSymmetricHashJoinExec webui query details.png
Figure 1. StreamingSymmetricHashJoinExec in web UI (Details for Query)

The following table shows how the performance metrics are computed (and so their exact meaning).

Name (in web UI) Description

total time to update rows

Processing time of all rows

total time to remove rows

time to commit changes

number of output rows

Total number of output rows

number of total state rows

number of updated state rows

Number of updated state rows of the left and right OneSideHashJoiners

memory used by state

Checking Out Whether Last Batch Execution Requires Another Non-Data Batch or Not — shouldRunAnotherBatch Method

shouldRunAnotherBatch(
  newMetadata: OffsetSeqMetadata): Boolean
Note
shouldRunAnotherBatch is part of the StateStoreWriter Contract to indicate whether MicroBatchExecution should run another non-data batch (based on the updated OffsetSeqMetadata with the current event-time watermark and the batch timestamp).

shouldRunAnotherBatch is positive (true) when all of the following are positive:

shouldRunAnotherBatch is negative (false) otherwise.

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

doExecute(): RDD[InternalRow]
Note
doExecute is part of SparkPlan Contract to generate the runtime representation of a physical operator as a recipe for distributed computation over internal binary rows on Apache Spark (RDD[InternalRow]).

doExecute first requests the StreamingQueryManager for the StateStoreCoordinatorRef to the StateStoreCoordinator RPC endpoint (for the driver).

doExecute then uses SymmetricHashJoinStateManager utility to get the names of the state stores for the left and right sides of the streaming join.

In the end, doExecute requests the left and right child physical operators to execute (generate an RDD) and then stateStoreAwareZipPartitions with processPartitions (and with the StateStoreCoordinatorRef and the state stores).

Processing Partitions of Left and Right Sides of Stream-Stream Join — processPartitions Internal Method

processPartitions(
  leftInputIter: Iterator[InternalRow],
  rightInputIter: Iterator[InternalRow]): Iterator[InternalRow]

processPartitions records the current time (as updateStartTimeNs for the total time to update rows performance metric in onOutputCompletion).

processPartitions creates a new predicate (postJoinFilter) based on the bothSides of the JoinConditionSplitPredicates if defined or true literal.

processPartitions creates a OneSideHashJoiner for the LeftSide and all other properties for the left-hand join side (leftSideJoiner).

processPartitions creates a OneSideHashJoiner for the RightSide and all other properties for the right-hand join side (rightSideJoiner).

processPartitions requests the OneSideHashJoiner for the left-hand join side to storeAndJoinWithOtherSide with the right-hand side one (that creates a leftOutputIter row iterator) and the OneSideHashJoiner for the right-hand join side to do the same with the left-hand side one (and creates a rightOutputIter row iterator).

processPartitions records the current time (as innerOutputCompletionTimeNs for the total time to remove rows performance metric in onOutputCompletion).

processPartitions creates a CompletionIterator with the left and right output iterators (with the rows of the leftOutputIter first followed by rightOutputIter). When no rows are left to process, the CompletionIterator records the completion time.

processPartitions creates a join-specific output Iterator[InternalRow] of the output rows based on the join type (of the StreamingSymmetricHashJoinExec):

  • For Inner joins, processPartitions simply uses the output iterator of the left and right rows

  • For LeftOuter joins, processPartitions…​

  • For RightOuter joins, processPartitions…​

  • For other joins, processPartitions simply throws an IllegalArgumentException.

processPartitions creates an UnsafeProjection for the output (and the output of the left and right child operators) that counts all the rows of the join-specific output iterator (as the numOutputRows metric) and generate an output projection.

In the end, processPartitions returns a CompletionIterator with with the output iterator with the rows counted (as numOutputRows metric) and onOutputCompletion completion function.

Note
processPartitions is used exclusively when StreamingSymmetricHashJoinExec physical operator is requested to execute.

Calculating Performance Metrics (Output Completion Callback) — onOutputCompletion Internal Method

onOutputCompletion: Unit

onOutputCompletion calculates the total time to update rows performance metric (that is the time since the processPartitions was executed).

onOutputCompletion adds the time for the inner join to complete (since innerOutputCompletionTimeNs time marker) to the total time to remove rows performance metric.

onOutputCompletion records the time to remove old state (per the join state watermark predicate for the left and the right streaming queries) and adds it to the total time to remove rows performance metric.

Note
onOutputCompletion triggers the old state removal eagerly by iterating over the state rows to be deleted.

onOutputCompletion records the time for the left and right OneSideHashJoiners to commit any state changes that becomes the time to commit changes performance metric.

onOutputCompletion calculates the number of updated state rows performance metric (as the number of updated state rows of the left and right streaming queries).

onOutputCompletion calculates the number of total state rows performance metric (as the sum of the number of keys in the KeyWithIndexToValueStore of the left and right streaming queries).

onOutputCompletion calculates the memory used by state performance metric (as the sum of the memory used by the KeyToNumValuesStore and KeyWithIndexToValueStore of the left and right streams).

In the end, onOutputCompletion calculates the custom metrics.

Internal Properties

Name Description

hadoopConfBcast

Hadoop Configuration broadcast (to the Spark cluster)

joinStateManager

nullLeft

GenericInternalRow of the size of the output schema of the left physical operator

nullRight

GenericInternalRow of the size of the output schema of the right physical operator

storeConf

results matching ""

    No results matching ""