val spark: SparkSession = ...
spark.sql("select * from t1, t2 where t1.id = t2.id")
Dataset Join Operators
From PostgreSQL’s 2.6. Joins Between Tables:
Queries can access multiple tables at once, or access the same table in such a way that multiple rows of the table are being processed at the same time. A query that accesses multiple rows of the same or different tables at one time is called a join query.
You can join two datasets using the join operators with an optional join condition.
Operator | Return Type | Description |
---|---|---|
Untyped |
||
Untyped |
||
Used for a type-preserving join with two output columns for records for which a join condition holds |
You can also use SQL mode to join datasets using good ol' SQL.
You can specify a join condition (aka join expression) as part of join operators or using where or filter operators.
df1.join(df2, $"df1Key" === $"df2Key")
df1.join(df2).where($"df1Key" === $"df2Key")
df1.join(df2).filter($"df1Key" === $"df2Key")
You can specify the join type as part of join operators (using joinType
optional parameter).
df1.join(df2, $"df1Key" === $"df2Key", "inner")
SQL | Name (joinType) | JoinType |
---|---|---|
|
|
|
|
||
|
||
|
||
|
||
Special case for |
|
|
Special case for |
ExistenceJoin
is an artifical join type used to express an existential sub-query, that is often referred to as existential join.
Note
|
LeftAnti and ExistenceJoin are special cases of LeftOuter. |
You can also find that Spark SQL uses the following two families of joins:
-
LeftExistence
with LeftSemi, LeftAnti and ExistenceJoin
Tip
|
Name are case-insensitive and can use the underscore (_ ) at any position, i.e. left_anti and LEFT_ANTI are equivalent.
|
Note
|
Spark SQL offers different join strategies with Broadcast Joins (aka Map-Side Joins) among them that are supposed to optimize your join queries over large distributed datasets. |
join
Operators
join(right: Dataset[_]): DataFrame (1)
join(right: Dataset[_], usingColumn: String): DataFrame (2)
join(right: Dataset[_], usingColumns: Seq[String]): DataFrame (3)
join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame (4)
join(right: Dataset[_], joinExprs: Column): DataFrame (5)
join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame (6)
-
Condition-less inner join
-
Inner join with a single column that exists on both sides
-
Inner join with columns that exist on both sides
-
Equi-join with explicit join type
-
Inner join
-
Join with explicit join type. Self-joins are acceptable.
join
joins two Dataset
s.
val left = Seq((0, "zero"), (1, "one")).toDF("id", "left")
val right = Seq((0, "zero"), (2, "two"), (3, "three")).toDF("id", "right")
// Inner join
scala> left.join(right, "id").show
+---+----+-----+
| id|left|right|
+---+----+-----+
| 0|zero| zero|
+---+----+-----+
scala> left.join(right, "id").explain
== Physical Plan ==
*Project [id#50, left#51, right#61]
+- *BroadcastHashJoin [id#50], [id#60], Inner, BuildRight
:- LocalTableScan [id#50, left#51]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- LocalTableScan [id#60, right#61]
// Full outer
scala> left.join(right, Seq("id"), "fullouter").show
+---+----+-----+
| id|left|right|
+---+----+-----+
| 1| one| null|
| 3|null|three|
| 2|null| two|
| 0|zero| zero|
+---+----+-----+
scala> left.join(right, Seq("id"), "fullouter").explain
== Physical Plan ==
*Project [coalesce(id#50, id#60) AS id#85, left#51, right#61]
+- SortMergeJoin [id#50], [id#60], FullOuter
:- *Sort [id#50 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#50, 200)
: +- LocalTableScan [id#50, left#51]
+- *Sort [id#60 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#60, 200)
+- LocalTableScan [id#60, right#61]
// Left anti
scala> left.join(right, Seq("id"), "leftanti").show
+---+----+
| id|left|
+---+----+
| 1| one|
+---+----+
scala> left.join(right, Seq("id"), "leftanti").explain
== Physical Plan ==
*BroadcastHashJoin [id#50], [id#60], LeftAnti, BuildRight
:- LocalTableScan [id#50, left#51]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- LocalTableScan [id#60]
Internally, join(right: Dataset[_])
creates a DataFrame with a condition-less Join logical operator (in the current SparkSession).
Note
|
join(right: Dataset[_]) creates a logical plan with a condition-less Join operator with two child logical plans of the both sides of the join.
|
Note
|
join(right: Dataset[_], usingColumns: Seq[String], joinType: String) creates a logical plan with a condition-less Join operator with UsingJoin join type.
|
Note
|
That is usually considered a trivially true condition and refused as acceptable. With spark.sql.selfJoinAutoResolveAmbiguity option enabled (which it is by default), |
crossJoin
Method
crossJoin(right: Dataset[_]): DataFrame
Note
|
crossJoin creates an explicit cartesian join that can be very expensive without an extra filter (that can be pushed down).
|
Type-Preserving Joins — joinWith
Operators
joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)] (1)
joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)]
-
inner equi-join
joinWith
creates a Dataset with two columns _1
and _2
that each contain records for which condition
holds.
case class Person(id: Long, name: String, cityId: Long)
case class City(id: Long, name: String)
val family = Seq(
Person(0, "Agata", 0),
Person(1, "Iweta", 0),
Person(2, "Patryk", 2),
Person(3, "Maksym", 0)).toDS
val cities = Seq(
City(0, "Warsaw"),
City(1, "Washington"),
City(2, "Sopot")).toDS
val joined = family.joinWith(cities, family("cityId") === cities("id"))
scala> joined.printSchema
root
|-- _1: struct (nullable = false)
| |-- id: long (nullable = false)
| |-- name: string (nullable = true)
| |-- cityId: long (nullable = false)
|-- _2: struct (nullable = false)
| |-- id: long (nullable = false)
| |-- name: string (nullable = true)
scala> joined.show
+------------+----------+
| _1| _2|
+------------+----------+
| [0,Agata,0]|[0,Warsaw]|
| [1,Iweta,0]|[0,Warsaw]|
|[2,Patryk,2]| [2,Sopot]|
|[3,Maksym,0]|[0,Warsaw]|
+------------+----------+
Note
|
joinWith preserves type-safety with the original object types.
|
Note
|
joinWith creates a Dataset with Join logical plan.
|