Join Operators

From PostgreSQL’s 2.6. Joins Between Tables:

Queries can access multiple tables at once, or access the same table in such a way that multiple rows of the table are being processed at the same time. A query that accesses multiple rows of the same or different tables at one time is called a join query.

You can join multiple Datasets by using join operators.

Table 1. Join Operators (in alphabetical order)
Operator Return Type Description

join

DataFrame

Untyped, Row-based join

joinWith

Dataset

Used for type-preserving join with two output columns for records for which join condition holds

Note

You can also use SparkSession to execute good ol' SQL with joins.

val spark: SparkSession = ???
spark.sql("select * from t1, t2 where t1.id = t2.id")
Table 2. Join Types (in alphabetical order)
SQL Name (joinType) JoinType

CROSS

cross

Cross

INNER

inner

Inner

FULL OUTER

outer, full, fullouter

FullOuter

LEFT ANTI

leftanti

LeftAnti

LEFT OUTER

leftouter, left

LeftOuter

LEFT SEMI

leftsemi

LeftSemi

RIGHT OUTER

rightouter, right

RightOuter

NATURAL

Special case for Inner, LeftOuter, RightOuter, FullOuter

NaturalJoin

USING

Special case for Inner, LeftOuter, LeftSemi, RightOuter, FullOuter, LeftAnti

UsingJoin

Tip
Name are case-insensitive and can use the underscore (_) at any position, i.e. left_anti and LEFT_ANTI are equivalent.

You can use the join expression as part of join operator or leave it out and describe using where operator.

df1.join(df2, $"df1Key" === $"df2Key")
df1.join(df2).where($"df1Key" === $"df2Key")

join Operators

join(right: Dataset[_]): DataFrame (1)
join(right: Dataset[_], usingColumn: String): DataFrame (2)
join(right: Dataset[_], usingColumns: Seq[String]): DataFrame (3)
join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame (4)
join(right: Dataset[_], joinExprs: Column): DataFrame (5)
join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame (6)
  1. Condition-less inner join

  2. Inner join with a single column that exists on both sides

  3. Inner join with columns that exist on both sides

  4. Equi-join with explicit join type

  5. Inner join

  6. Join with explicit join type. Self-joins are acceptable.

join joins two Datasets.

val left = Seq((0, "zero"), (1, "one")).toDF("id", "left")
val right = Seq((0, "zero"), (2, "two"), (3, "three")).toDF("id", "right")

// Inner join
scala> left.join(right, "id").show
+---+----+-----+
| id|left|right|
+---+----+-----+
|  0|zero| zero|
+---+----+-----+

scala> left.join(right, "id").explain
== Physical Plan ==
*Project [id#50, left#51, right#61]
+- *BroadcastHashJoin [id#50], [id#60], Inner, BuildRight
   :- LocalTableScan [id#50, left#51]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
      +- LocalTableScan [id#60, right#61]

// Full outer
scala> left.join(right, Seq("id"), "fullouter").show
+---+----+-----+
| id|left|right|
+---+----+-----+
|  1| one| null|
|  3|null|three|
|  2|null|  two|
|  0|zero| zero|
+---+----+-----+

scala> left.join(right, Seq("id"), "fullouter").explain
== Physical Plan ==
*Project [coalesce(id#50, id#60) AS id#85, left#51, right#61]
+- SortMergeJoin [id#50], [id#60], FullOuter
   :- *Sort [id#50 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#50, 200)
   :     +- LocalTableScan [id#50, left#51]
   +- *Sort [id#60 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#60, 200)
         +- LocalTableScan [id#60, right#61]

// Left anti
scala> left.join(right, Seq("id"), "leftanti").show
+---+----+
| id|left|
+---+----+
|  1| one|
+---+----+

scala> left.join(right, Seq("id"), "leftanti").explain
== Physical Plan ==
*BroadcastHashJoin [id#50], [id#60], LeftAnti, BuildRight
:- LocalTableScan [id#50, left#51]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
   +- LocalTableScan [id#60]

Internally, join(right: Dataset[_]) creates a DataFrame with a condition-less Join logical operator (in the current SparkSession).

Note
join(right: Dataset[_]) creates a logical plan with a condition-less Join operator with two child logical plans of the both sides of the join.
Note
join(right: Dataset[_], usingColumns: Seq[String], joinType: String) creates a logical plan with a condition-less Join operator with UsingJoin join type.
Note

join(right: Dataset[_], joinExprs: Column, joinType: String) accepts self-joins where joinExprs is of the form:

df("key") === df("key")

That is usually considered a trivially true condition and refused as acceptable.

With spark.sql.selfJoinAutoResolveAmbiguity option enabled (which it is by default), join will automatically resolve ambiguous join conditions into ones that might make sense.

crossJoin Method

crossJoin(right: Dataset[_]): DataFrame

crossJoin joins two Datasets using Cross join type with no condition.

Note
crossJoin creates an explicit cartesian join that can be very expensive without an extra filter (that can be pushed down).

Type-Preserving Joins — joinWith Operators

joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)]  (1)
joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)]
  1. Type-safe inner join

joinWith creates a Dataset with two columns _1 and _2 that each contains records for which condition holds.

case class Person(id: Long, name: String, cityId: Long)
case class City(id: Long, name: String)

val people = Seq(Person(0, "Agata", 0), Person(1, "Iweta", 0)).toDS
val cities = Seq(City(0, "Warsaw"), City(1, "Washington")).toDS

val joined = people.joinWith(cities, people("cityId") === cities("id"))

scala> joined.printSchema
root
 |-- _1: struct (nullable = false)
 |    |-- id: long (nullable = false)
 |    |-- name: string (nullable = true)
 |    |-- cityId: long (nullable = false)
 |-- _2: struct (nullable = false)
 |    |-- id: long (nullable = false)
 |    |-- name: string (nullable = true)

scala> joined.show
+-----------+----------+
|         _1|        _2|
+-----------+----------+
|[0,Agata,0]|[0,Warsaw]|
|[1,Iweta,0]|[0,Warsaw]|
+-----------+----------+
Note
joinWith preserves type-safety with the original object types.
Note
joinWith creates a Dataset with Join logical plan.

Broadcast Join (aka Map-Side Join)

Caution
FIXME: Review BroadcastNestedLoop.

You can use broadcast function to mark a Dataset to be broadcast when used in a join operator.

Note
According to the article Map-Side Join in Spark, broadcast join is also called a replicated join (in the distributed system community) or a map-side join (in the Hadoop community).
Note
At long last! I have always been wondering what a map-side join is and it appears I am close to uncover the truth!

And later in the article Map-Side Join in Spark, you can find that with the broadcast join, you can very effectively join a large table (fact) with relatively small tables (dimensions), i.e. to perform a star-schema join you can avoid sending all data of the large table over the network.

CanBroadcast object matches a LogicalPlan with output small enough for broadcast join.

Note
Currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE [tableName] COMPUTE STATISTICS noscan has been run.

It uses spark.sql.autoBroadcastJoinThreshold setting to control the size of a table that will be broadcast to all worker nodes when performing a join.

results matching ""

    No results matching ""