val tokens = Seq(
(0, "playing"),
(1, "with"),
(2, "BroadcastHashJoinExec")
).toDF("id", "token")
scala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
res0: String = 10485760
val q = tokens.join(tokens, Seq("id"), "inner")
scala> q.explain
== Physical Plan ==
*Project [id#15, token#16, token#21]
+- *BroadcastHashJoin [id#15], [id#20], Inner, BuildRight
:- LocalTableScan [id#15, token#16]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- LocalTableScan [id#20, token#21]
BroadcastHashJoinExec Binary Physical Operator for Broadcast Hash Join
BroadcastHashJoinExec
is a binary physical operator to perform a broadcast hash join.
BroadcastHashJoinExec
is created after applying JoinSelection execution planning strategy to ExtractEquiJoinKeys-destructurable logical query plans (i.e. INNER, CROSS, LEFT OUTER, LEFT SEMI, LEFT ANTI) of which the right
physical operator can be broadcast.
BroadcastHashJoinExec
supports Java code generation (aka codegen).
BroadcastHashJoinExec
requires that partition requirements for the two children physical operators match BroadcastDistribution (with a HashedRelationBroadcastMode) and UnspecifiedDistribution (for left and right sides of a join or vice versa).
Key | Name (in web UI) | Description |
---|---|---|
number of output rows |
||
avg hash probe |
Note
|
The prefix for variable names for BroadcastHashJoinExec operators in CodegenSupport-generated code is bhj.
|
scala> q.queryExecution.debug.codegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Project [id#15, token#16, token#21]
+- *BroadcastHashJoin [id#15], [id#20], Inner, BuildRight
:- LocalTableScan [id#15, token#16]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- LocalTableScan [id#20, token#21]
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private scala.collection.Iterator[] inputs;
/* 008 */ private scala.collection.Iterator inputadapter_input;
/* 009 */ private org.apache.spark.broadcast.TorrentBroadcast bhj_broadcast;
/* 010 */ private org.apache.spark.sql.execution.joins.LongHashedRelation bhj_relation;
/* 011 */ private org.apache.spark.sql.execution.metric.SQLMetric bhj_numOutputRows;
/* 012 */ private UnsafeRow bhj_result;
/* 013 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder bhj_holder;
/* 014 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter bhj_rowWriter;
...
BuildSide | Left Child | Right Child |
---|---|---|
|
BroadcastDistribution with HashedRelationBroadcastMode broadcast mode of build join keys |
|
|
BroadcastDistribution with HashedRelationBroadcastMode broadcast mode of build join keys |
Executing Physical Operator (Generating RDD[InternalRow]) — doExecute
Method
doExecute(): RDD[InternalRow]
Note
|
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow] ).
|
doExecute
…FIXME
Generating Java Source Code for Inner Join — codegenInner
Internal Method
codegenInner(ctx: CodegenContext, input: Seq[ExprCode]): String
codegenInner
…FIXME
Note
|
codegenInner is used exclusively when BroadcastHashJoinExec is requested to generate the Java code for the "consume" path in whole-stage code generation.
|
Generating Java Source Code for Left or Right Outer Join — codegenOuter
Internal Method
codegenOuter(ctx: CodegenContext, input: Seq[ExprCode]): String
codegenOuter
…FIXME
Note
|
codegenOuter is used exclusively when BroadcastHashJoinExec is requested to generate the Java code for the "consume" path in whole-stage code generation.
|
Generating Java Source Code for Left Semi Join — codegenSemi
Internal Method
codegenSemi(ctx: CodegenContext, input: Seq[ExprCode]): String
codegenSemi
…FIXME
Note
|
codegenSemi is used exclusively when BroadcastHashJoinExec is requested to generate the Java code for the "consume" path in whole-stage code generation.
|
Generating Java Source Code for Anti Join — codegenAnti
Internal Method
codegenAnti(ctx: CodegenContext, input: Seq[ExprCode]): String
codegenAnti
…FIXME
Note
|
codegenAnti is used exclusively when BroadcastHashJoinExec is requested to generate the Java code for the "consume" path in whole-stage code generation.
|
codegenExistence
Internal Method
codegenExistence(ctx: CodegenContext, input: Seq[ExprCode]): String
codegenExistence
…FIXME
Note
|
codegenExistence is used exclusively when BroadcastHashJoinExec is requested to generate the Java code for the "consume" path in whole-stage code generation.
|
genStreamSideJoinKey
Internal Method
genStreamSideJoinKey(
ctx: CodegenContext,
input: Seq[ExprCode]): (ExprCode, String)
genStreamSideJoinKey
…FIXME
Creating BroadcastHashJoinExec Instance
BroadcastHashJoinExec
takes the following when created:
-
Left join key expressions
-
Right join key expressions
-
Optional join condition expression
-
Left physical operator
-
Right physical operator