SortMergeJoinExec Binary Physical Operator for Sort Merge Join

SortMergeJoinExec is a binary physical operator to execute a sort merge join.

ShuffledHashJoinExec is selected to represent a Join logical operator when JoinSelection execution planning strategy is executed for joins with left join keys that are orderable, i.e. that can be ordered (sorted).

Note

A join key is orderable when is of one of the following data types:

  • NullType

  • AtomicType (that represents all the available types except NullType, StructType, ArrayType, UserDefinedType, MapType, and ObjectType)

  • StructType with orderable fields

  • ArrayType of orderable type

  • UserDefinedType of orderable type

Therefore, a join key is not orderable when is of the following data type:

  • MapType

  • ObjectType

Note

spark.sql.join.preferSortMergeJoin is an internal configuration property and is enabled by default.

That means that JoinSelection execution planning strategy (and so Spark Planner) prefers sort merge join over shuffled hash join.

SortMergeJoinExec supports Java code generation (aka codegen) for inner and cross joins.

Tip

Enable DEBUG logging level for org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys logger to see the join condition and the left and right join keys.

// Disable auto broadcasting so Broadcast Hash Join won't take precedence
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

val tokens = Seq(
  (0, "playing"),
  (1, "with"),
  (2, "SortMergeJoinExec")
).toDF("id", "token")

// all data types are orderable
scala> tokens.printSchema
root
 |-- id: integer (nullable = false)
 |-- token: string (nullable = true)

// Spark Planner prefers SortMergeJoin over Shuffled Hash Join
scala> println(spark.conf.get("spark.sql.join.preferSortMergeJoin"))
true

val q = tokens.join(tokens, Seq("id"), "inner")
scala> q.explain
== Physical Plan ==
*(3) Project [id#5, token#6, token#10]
+- *(3) SortMergeJoin [id#5], [id#9], Inner
   :- *(1) Sort [id#5 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#5, 200)
   :     +- LocalTableScan [id#5, token#6]
   +- *(2) Sort [id#9 ASC NULLS FIRST], false, 0
      +- ReusedExchange [id#9, token#10], Exchange hashpartitioning(id#5, 200)
Table 1. SortMergeJoinExec’s Performance Metrics
Key Name (in web UI) Description

numOutputRows

number of output rows

spark sql SortMergeJoinExec webui query details.png
Figure 1. SortMergeJoinExec in web UI (Details for Query)
Note
The prefix for variable names for SortMergeJoinExec operators in CodegenSupport-generated code is smj.
scala> q.queryExecution.debug.codegen
Found 3 WholeStageCodegen subtrees.
== Subtree 1 / 3 ==
*Project [id#5, token#6, token#11]
+- *SortMergeJoin [id#5], [id#10], Inner
   :- *Sort [id#5 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#5, 200)
   :     +- LocalTableScan [id#5, token#6]
   +- *Sort [id#10 ASC NULLS FIRST], false, 0
      +- ReusedExchange [id#10, token#11], Exchange hashpartitioning(id#5, 200)

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator smj_leftInput;
/* 009 */   private scala.collection.Iterator smj_rightInput;
/* 010 */   private InternalRow smj_leftRow;
/* 011 */   private InternalRow smj_rightRow;
/* 012 */   private int smj_value2;
/* 013 */   private org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray smj_matches;
/* 014 */   private int smj_value3;
/* 015 */   private int smj_value4;
/* 016 */   private UTF8String smj_value5;
/* 017 */   private boolean smj_isNull2;
/* 018 */   private org.apache.spark.sql.execution.metric.SQLMetric smj_numOutputRows;
/* 019 */   private UnsafeRow smj_result;
/* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder smj_holder;
/* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter smj_rowWriter;
...

The output schema of a SortMergeJoinExec is…​FIXME

The outputPartitioning of a SortMergeJoinExec is…​FIXME

The outputOrdering of a SortMergeJoinExec is…​FIXME

The partitioning requirements of the input of a SortMergeJoinExec (aka child output distributions) are HashClusteredDistributions of left and right join keys.

Table 2. SortMergeJoinExec’s Required Child Output Distributions
Left Child Right Child

HashClusteredDistribution (per left join key expressions)

HashClusteredDistribution (per right join key expressions)

The ordering requirements of the input of a SortMergeJoinExec (aka child output ordering) is…​FIXME

Note
SortMergeJoinExec operator is chosen in JoinSelection execution planning strategy (after BroadcastHashJoinExec and ShuffledHashJoinExec physical join operators have not met the requirements).

Generating Java Source Code for Produce Path in Whole-Stage Code Generation — doProduce Method

doProduce(ctx: CodegenContext): String
Note
doProduce is part of CodegenSupport Contract to generate the Java source code for produce path in Whole-Stage Code Generation.

doProduce…​FIXME

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

doExecute(): RDD[InternalRow]
Note
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

doExecute…​FIXME

Creating SortMergeJoinExec Instance

SortMergeJoinExec takes the following when created:

results matching ""

    No results matching ""