// Disable auto broadcasting so Broadcast Hash Join won't take precedence
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
val tokens = Seq(
(0, "playing"),
(1, "with"),
(2, "SortMergeJoinExec")
).toDF("id", "token")
// all data types are orderable
scala> tokens.printSchema
|-- id: integer (nullable = false)
|-- token: string (nullable = true)
// Spark Planner prefers SortMergeJoin over Shuffled Hash Join
scala> println(spark.conf.get("spark.sql.join.preferSortMergeJoin"))
val q = tokens.join(tokens, Seq("id"), "inner")
scala> q.explain
== Physical Plan ==
*(3) Project [id#5, token#6, token#10]
+- *(3) SortMergeJoin [id#5], [id#9], Inner
:- *(1) Sort [id#5 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#5, 200)
: +- LocalTableScan [id#5, token#6]
+- *(2) Sort [id#9 ASC NULLS FIRST], false, 0
+- ReusedExchange [id#9, token#10], Exchange hashpartitioning(id#5, 200)
SortMergeJoinExec Binary Physical Operator for Sort Merge Join
is a binary physical operator to execute a sort merge join.
is selected to represent a Join logical operator when JoinSelection execution planning strategy is executed for joins with left join keys that are orderable, i.e. that can be ordered (sorted).
A join key is orderable when is of one of the following data types:
Therefore, a join key is not orderable when is of the following data type:
spark.sql.join.preferSortMergeJoin is an internal configuration property and is enabled by default. That means that JoinSelection execution planning strategy (and so Spark Planner) prefers sort merge join over shuffled hash join. |
supports Java code generation (aka codegen) for inner and cross joins.
Enable |
Key | Name (in web UI) | Description |
number of output rows |

The prefix for variable names for SortMergeJoinExec operators in CodegenSupport-generated code is smj.
scala> q.queryExecution.debug.codegen
Found 3 WholeStageCodegen subtrees.
== Subtree 1 / 3 ==
*Project [id#5, token#6, token#11]
+- *SortMergeJoin [id#5], [id#10], Inner
:- *Sort [id#5 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#5, 200)
: +- LocalTableScan [id#5, token#6]
+- *Sort [id#10 ASC NULLS FIRST], false, 0
+- ReusedExchange [id#10, token#11], Exchange hashpartitioning(id#5, 200)
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private scala.collection.Iterator[] inputs;
/* 008 */ private scala.collection.Iterator smj_leftInput;
/* 009 */ private scala.collection.Iterator smj_rightInput;
/* 010 */ private InternalRow smj_leftRow;
/* 011 */ private InternalRow smj_rightRow;
/* 012 */ private int smj_value2;
/* 013 */ private org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray smj_matches;
/* 014 */ private int smj_value3;
/* 015 */ private int smj_value4;
/* 016 */ private UTF8String smj_value5;
/* 017 */ private boolean smj_isNull2;
/* 018 */ private org.apache.spark.sql.execution.metric.SQLMetric smj_numOutputRows;
/* 019 */ private UnsafeRow smj_result;
/* 020 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder smj_holder;
/* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter smj_rowWriter;
The output schema of a SortMergeJoinExec
The outputPartitioning of a SortMergeJoinExec
The outputOrdering of a SortMergeJoinExec
The partitioning requirements of the input of a SortMergeJoinExec
(aka child output distributions) are HashClusteredDistributions of left and right join keys.
Left Child | Right Child |
The ordering requirements of the input of a SortMergeJoinExec
(aka child output ordering) is…FIXME
SortMergeJoinExec operator is chosen in JoinSelection execution planning strategy (after BroadcastHashJoinExec and ShuffledHashJoinExec physical join operators have not met the requirements).
Generating Java Source Code for Produce Path in Whole-Stage Code Generation — doProduce
doProduce(ctx: CodegenContext): String
doProduce is part of CodegenSupport Contract to generate the Java source code for produce path in Whole-Stage Code Generation.
Executing Physical Operator (Generating RDD[InternalRow]) — doExecute
doExecute(): RDD[InternalRow]
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow] ).
Creating SortMergeJoinExec Instance
takes the following when created:
Left join key expressions
Right join key expressions
Optional join condition expression
Left physical operator
Right physical operator