val tokens = Seq(
(0, "playing"),
(1, "with"),
(2, "BroadcastHashJoinExec")
).toDF("id", "token")
scala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
res0: String = 10485760
val q = tokens.join(tokens, Seq("id"), "inner")
scala> q.explain
== Physical Plan ==
*Project [id#15, token#16, token#21]
+- *BroadcastHashJoin [id#15], [id#20], Inner, BuildRight
:- LocalTableScan [id#15, token#16]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- LocalTableScan [id#20, token#21]
BroadcastHashJoinExec Binary Physical Operator for Broadcast Hash Join
BroadcastHashJoinExec is a binary physical operator to perform a broadcast hash join.
BroadcastHashJoinExec is created after applying JoinSelection execution planning strategy to ExtractEquiJoinKeys-destructurable logical query plans (i.e. INNER, CROSS, LEFT OUTER, LEFT SEMI, LEFT ANTI) of which the right physical operator can be broadcast.
BroadcastHashJoinExec supports Java code generation (aka codegen).
BroadcastHashJoinExec requires that partition requirements for the two children physical operators match BroadcastDistribution (with a HashedRelationBroadcastMode) and UnspecifiedDistribution (for left and right sides of a join or vice versa).
| Key | Name (in web UI) | Description |
|---|---|---|
number of output rows |
||
avg hash probe |
|
Note
|
The prefix for variable names for BroadcastHashJoinExec operators in CodegenSupport-generated code is bhj.
|
scala> q.queryExecution.debug.codegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Project [id#15, token#16, token#21]
+- *BroadcastHashJoin [id#15], [id#20], Inner, BuildRight
:- LocalTableScan [id#15, token#16]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- LocalTableScan [id#20, token#21]
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private scala.collection.Iterator[] inputs;
/* 008 */ private scala.collection.Iterator inputadapter_input;
/* 009 */ private org.apache.spark.broadcast.TorrentBroadcast bhj_broadcast;
/* 010 */ private org.apache.spark.sql.execution.joins.LongHashedRelation bhj_relation;
/* 011 */ private org.apache.spark.sql.execution.metric.SQLMetric bhj_numOutputRows;
/* 012 */ private UnsafeRow bhj_result;
/* 013 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder bhj_holder;
/* 014 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter bhj_rowWriter;
...
| BuildSide | Left Child | Right Child |
|---|---|---|
|
BroadcastDistribution with HashedRelationBroadcastMode broadcast mode of build join keys |
|
|
BroadcastDistribution with HashedRelationBroadcastMode broadcast mode of build join keys |
Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method
doExecute(): RDD[InternalRow]
|
Note
|
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).
|
doExecute…FIXME
Generating Java Source Code for Inner Join — codegenInner Internal Method
codegenInner(ctx: CodegenContext, input: Seq[ExprCode]): String
codegenInner…FIXME
|
Note
|
codegenInner is used exclusively when BroadcastHashJoinExec is requested to generate the Java code for the "consume" path in whole-stage code generation.
|
Generating Java Source Code for Left or Right Outer Join — codegenOuter Internal Method
codegenOuter(ctx: CodegenContext, input: Seq[ExprCode]): String
codegenOuter…FIXME
|
Note
|
codegenOuter is used exclusively when BroadcastHashJoinExec is requested to generate the Java code for the "consume" path in whole-stage code generation.
|
Generating Java Source Code for Left Semi Join — codegenSemi Internal Method
codegenSemi(ctx: CodegenContext, input: Seq[ExprCode]): String
codegenSemi…FIXME
|
Note
|
codegenSemi is used exclusively when BroadcastHashJoinExec is requested to generate the Java code for the "consume" path in whole-stage code generation.
|
Generating Java Source Code for Anti Join — codegenAnti Internal Method
codegenAnti(ctx: CodegenContext, input: Seq[ExprCode]): String
codegenAnti…FIXME
|
Note
|
codegenAnti is used exclusively when BroadcastHashJoinExec is requested to generate the Java code for the "consume" path in whole-stage code generation.
|
codegenExistence Internal Method
codegenExistence(ctx: CodegenContext, input: Seq[ExprCode]): String
codegenExistence…FIXME
|
Note
|
codegenExistence is used exclusively when BroadcastHashJoinExec is requested to generate the Java code for the "consume" path in whole-stage code generation.
|
genStreamSideJoinKey Internal Method
genStreamSideJoinKey(
ctx: CodegenContext,
input: Seq[ExprCode]): (ExprCode, String)
genStreamSideJoinKey…FIXME
Creating BroadcastHashJoinExec Instance
BroadcastHashJoinExec takes the following when created:
-
Left join key expressions
-
Right join key expressions
-
Optional join condition expression
-
Left physical operator
-
Right physical operator