BroadcastHashJoinExec Binary Physical Operator for Broadcast Hash Join

BroadcastHashJoinExec is a binary physical operator to perform a broadcast hash join.

BroadcastHashJoinExec is created after applying JoinSelection execution planning strategy to ExtractEquiJoinKeys-destructurable logical query plans (i.e. INNER, CROSS, LEFT OUTER, LEFT SEMI, LEFT ANTI) of which the right physical operator can be broadcast.

BroadcastHashJoinExec supports Java code generation (aka codegen).

val tokens = Seq(
  (0, "playing"),
  (1, "with"),
  (2, "BroadcastHashJoinExec")
).toDF("id", "token")

scala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
res0: String = 10485760

val q = tokens.join(tokens, Seq("id"), "inner")
scala> q.explain
== Physical Plan ==
*Project [id#15, token#16, token#21]
+- *BroadcastHashJoin [id#15], [id#20], Inner, BuildRight
   :- LocalTableScan [id#15, token#16]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
      +- LocalTableScan [id#20, token#21]

BroadcastHashJoinExec requires that partition requirements for the two children physical operators match BroadcastDistribution (with a HashedRelationBroadcastMode) and UnspecifiedDistribution (for left and right sides of a join or vice versa).

Table 1. BroadcastHashJoinExec’s Performance Metrics
Key Name (in web UI) Description

numOutputRows

number of output rows

avgHashProbe

avg hash probe

spark sql BroadcastHashJoinExec webui query details.png
Figure 1. BroadcastHashJoinExec in web UI (Details for Query)
Note
The prefix for variable names for BroadcastHashJoinExec operators in CodegenSupport-generated code is bhj.
scala> q.queryExecution.debug.codegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Project [id#15, token#16, token#21]
+- *BroadcastHashJoin [id#15], [id#20], Inner, BuildRight
   :- LocalTableScan [id#15, token#16]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
      +- LocalTableScan [id#20, token#21]

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private org.apache.spark.broadcast.TorrentBroadcast bhj_broadcast;
/* 010 */   private org.apache.spark.sql.execution.joins.LongHashedRelation bhj_relation;
/* 011 */   private org.apache.spark.sql.execution.metric.SQLMetric bhj_numOutputRows;
/* 012 */   private UnsafeRow bhj_result;
/* 013 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder bhj_holder;
/* 014 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter bhj_rowWriter;
...
Table 2. BroadcastHashJoinExec’s Required Child Output Distributions
BuildSide Left Child Right Child

BuildLeft

BroadcastDistribution with HashedRelationBroadcastMode broadcast mode of build join keys

UnspecifiedDistribution

BuildRight

UnspecifiedDistribution

BroadcastDistribution with HashedRelationBroadcastMode broadcast mode of build join keys

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

doExecute(): RDD[InternalRow]
Note
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

doExecute…​FIXME

Generating Java Source Code for Inner Join — codegenInner Internal Method

codegenInner(ctx: CodegenContext, input: Seq[ExprCode]): String

codegenInner…​FIXME

Note
codegenInner is used exclusively when BroadcastHashJoinExec is requested to generate the Java code for the "consume" path in whole-stage code generation.

Generating Java Source Code for Left or Right Outer Join — codegenOuter Internal Method

codegenOuter(ctx: CodegenContext, input: Seq[ExprCode]): String

codegenOuter…​FIXME

Note
codegenOuter is used exclusively when BroadcastHashJoinExec is requested to generate the Java code for the "consume" path in whole-stage code generation.

Generating Java Source Code for Left Semi Join — codegenSemi Internal Method

codegenSemi(ctx: CodegenContext, input: Seq[ExprCode]): String

codegenSemi…​FIXME

Note
codegenSemi is used exclusively when BroadcastHashJoinExec is requested to generate the Java code for the "consume" path in whole-stage code generation.

Generating Java Source Code for Anti Join — codegenAnti Internal Method

codegenAnti(ctx: CodegenContext, input: Seq[ExprCode]): String

codegenAnti…​FIXME

Note
codegenAnti is used exclusively when BroadcastHashJoinExec is requested to generate the Java code for the "consume" path in whole-stage code generation.

codegenExistence Internal Method

codegenExistence(ctx: CodegenContext, input: Seq[ExprCode]): String

codegenExistence…​FIXME

Note
codegenExistence is used exclusively when BroadcastHashJoinExec is requested to generate the Java code for the "consume" path in whole-stage code generation.

genStreamSideJoinKey Internal Method

genStreamSideJoinKey(
  ctx: CodegenContext,
  input: Seq[ExprCode]): (ExprCode, String)

genStreamSideJoinKey…​FIXME

Note
genStreamSideJoinKey is used when BroadcastHashJoinExec is requested to generate the Java source code for inner, outer, left semi, anti and existence joins (for the "consume" path in whole-stage code generation).

Creating BroadcastHashJoinExec Instance

BroadcastHashJoinExec takes the following when created:

results matching ""

    No results matching ""