// Disable auto broadcasting so Broadcast Hash Join won't take precedence
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
val tokens = Seq(
(0, "playing"),
(1, "with"),
(2, "SortMergeJoinExec")
).toDF("id", "token")
// all data types are orderable
scala> tokens.printSchema
root
|-- id: integer (nullable = false)
|-- token: string (nullable = true)
// Spark Planner prefers SortMergeJoin over Shuffled Hash Join
scala> println(spark.conf.get("spark.sql.join.preferSortMergeJoin"))
true
val q = tokens.join(tokens, Seq("id"), "inner")
scala> q.explain
== Physical Plan ==
*(3) Project [id#5, token#6, token#10]
+- *(3) SortMergeJoin [id#5], [id#9], Inner
:- *(1) Sort [id#5 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#5, 200)
: +- LocalTableScan [id#5, token#6]
+- *(2) Sort [id#9 ASC NULLS FIRST], false, 0
+- ReusedExchange [id#9, token#10], Exchange hashpartitioning(id#5, 200)
SortMergeJoinExec Binary Physical Operator for Sort Merge Join
SortMergeJoinExec
is a binary physical operator to execute a sort merge join.
ShuffledHashJoinExec
is selected to represent a Join logical operator when JoinSelection execution planning strategy is executed for joins with left join keys that are orderable, i.e. that can be ordered (sorted).
Note
|
A join key is orderable when is of one of the following data types:
Therefore, a join key is not orderable when is of the following data type:
|
Note
|
spark.sql.join.preferSortMergeJoin is an internal configuration property and is enabled by default. That means that JoinSelection execution planning strategy (and so Spark Planner) prefers sort merge join over shuffled hash join. |
SortMergeJoinExec
supports Java code generation (aka codegen) for inner and cross joins.
Tip
|
Enable |
Key | Name (in web UI) | Description |
---|---|---|
number of output rows |
Note
|
The prefix for variable names for SortMergeJoinExec operators in CodegenSupport-generated code is smj.
|
scala> q.queryExecution.debug.codegen
Found 3 WholeStageCodegen subtrees.
== Subtree 1 / 3 ==
*Project [id#5, token#6, token#11]
+- *SortMergeJoin [id#5], [id#10], Inner
:- *Sort [id#5 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#5, 200)
: +- LocalTableScan [id#5, token#6]
+- *Sort [id#10 ASC NULLS FIRST], false, 0
+- ReusedExchange [id#10, token#11], Exchange hashpartitioning(id#5, 200)
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private scala.collection.Iterator[] inputs;
/* 008 */ private scala.collection.Iterator smj_leftInput;
/* 009 */ private scala.collection.Iterator smj_rightInput;
/* 010 */ private InternalRow smj_leftRow;
/* 011 */ private InternalRow smj_rightRow;
/* 012 */ private int smj_value2;
/* 013 */ private org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray smj_matches;
/* 014 */ private int smj_value3;
/* 015 */ private int smj_value4;
/* 016 */ private UTF8String smj_value5;
/* 017 */ private boolean smj_isNull2;
/* 018 */ private org.apache.spark.sql.execution.metric.SQLMetric smj_numOutputRows;
/* 019 */ private UnsafeRow smj_result;
/* 020 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder smj_holder;
/* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter smj_rowWriter;
...
The output schema of a SortMergeJoinExec
is…FIXME
The outputPartitioning of a SortMergeJoinExec
is…FIXME
The outputOrdering of a SortMergeJoinExec
is…FIXME
The partitioning requirements of the input of a SortMergeJoinExec
(aka child output distributions) are HashClusteredDistributions of left and right join keys.
Left Child | Right Child |
---|---|
The ordering requirements of the input of a SortMergeJoinExec
(aka child output ordering) is…FIXME
Note
|
SortMergeJoinExec operator is chosen in JoinSelection execution planning strategy (after BroadcastHashJoinExec and ShuffledHashJoinExec physical join operators have not met the requirements).
|
Generating Java Source Code for Produce Path in Whole-Stage Code Generation — doProduce
Method
doProduce(ctx: CodegenContext): String
Note
|
doProduce is part of CodegenSupport Contract to generate the Java source code for produce path in Whole-Stage Code Generation.
|
doProduce
…FIXME
Executing Physical Operator (Generating RDD[InternalRow]) — doExecute
Method
doExecute(): RDD[InternalRow]
Note
|
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow] ).
|
doExecute
…FIXME
Creating SortMergeJoinExec Instance
SortMergeJoinExec
takes the following when created:
-
Left join key expressions
-
Right join key expressions
-
Optional join condition expression
-
Left physical operator
-
Right physical operator