Join types in Spark SQL

on waitingforcode.com

Join types in Spark SQL

Spark SQL reflects the most of concepts related to relational databases as possible. One of them are joins that can be defined in one of 7 forms.

This post describes all of these types. Each of them, beginning by the most obvious ones (inner), is presented in distinct section. In each part, a short join's description is accompanied by simple test case.

All available joins are represented by the classes extending sealed abstract class org.apache.spark.sql.catalyst.plans.JoinType. The implementations are characterized by the property sql: String. It represents the SQL names used in generated SQL queries. It occurs for instance during logical plan translation to SQL query string (org.apache.spark.sql.catalyst.SQLBuilder class).

Specific JOIN type are inner joins. Unlike the others, they use another property called explicitCartesian: Boolean. This flag is used to indicate that the query contains CROSS JOIN and, in consequence, it'll generate a Cartesian Product. The "inner" type contains 2 implementations: inner join and cross join.

All tests in below sections use the following data:

private val customers = Seq(
  (1, "Customer_1"), (2, "Customer_2"), (3, "Customer_3")
).toDF("id", "login")

private val orders = Seq(
    (1, 1, 50.0d), (2, 2, 10d),
    (3, 2, 10d), (4, 2, 10d),
    (5, 1000, 19d)
  ).toDF("id", "customers_id", "amount")

INNER JOIN

The inner join is one of the simplest to understand. It joins rows only if they've correspondance in both DataFrames. Thus, in the case of our customer-order datasets, the Customer_3 and Order#5 won't be joined:

"inner join" should "collect only matching rows from both datasets" in {
  val ordersByCustomer = orders
    .join(customers, orders("customers_id") === customers("id"), "inner")
    .map(Mapper.mapJoinedRow(_))
    .collect()

  ordersByCustomer.size shouldEqual(4)
  ordersByCustomer should contain allOf("1,1,50.0,1,Customer_1", "2,2,10.0,2,Customer_2", "3,2,10.0,2,Customer_2",
    "4,2,10.0,2,Customer_2")
}

CROSS JOIN

The second type, cross join is more permissive than the previous one. In fact, the cross join leads to the creation of Cartesian Product. But the difference with other types resides in the definition. To use cross join, we must skip the condition on join columns, so define the join as dataset1.join(dataset2)). It's also another prerequisite to met - the configuration entry spark.sql.crossJoin.enabled must be set to true. Otherwise, the exception as the following one will be thrown:

Detected cartesian product for INNER join between logical plans
Project [_1#14 AS id#19, _2#15 AS customers_id#20, _3#16 AS amount#21, _4#17L AS date#22L]
  +- LocalRelation [_1#14, _2#15, _3#16, _4#17L]
and
Project [_1#2 AS id#5, _2#3 AS login#6]
  +- LocalRelation [_1#2, _2#3]
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;
org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
Project [_1#14 AS id#19, _2#15 AS customers_id#20, _3#16 AS amount#21, _4#17L AS date#22L]
  +- LocalRelation [_1#14, _2#15, _3#16, _4#17L]
and
Project [_1#2 AS id#5, _2#3 AS login#6]
  +- LocalRelation [_1#2, _2#3]
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.

The test below shows the use of cross join with crossJoin property enabled:

"cross join with enabled crossJoin property" should "create a Cartesian Product" in {
  val ordersByCustomer = orders.join(customers)
    .map(Mapper.mapJoinedRow(_))
    .collect()

  ordersByCustomer.size shouldEqual(15)
  ordersByCustomer should contain allOf("1,1,50.0,1,Customer_1", "1,1,50.0,2,Customer_2", "1,1,50.0,3,Customer_3",
    "2,2,10.0,1,Customer_1", "2,2,10.0,2,Customer_2", "2,2,10.0,3,Customer_3",
    "3,2,10.0,1,Customer_1", "3,2,10.0,2,Customer_2", "3,2,10.0,3,Customer_3",
    "4,2,10.0,1,Customer_1", "4,2,10.0,2,Customer_2", "4,2,10.0,3,Customer_3",
    "5,1000,19.0,1,Customer_1", "5,1000,19.0,2,Customer_2", "5,1000,19.0,3,Customer_3"
  )
}

LEFT OUTER JOIN

In left outer join, all data from the left dataset is contained in joined dataset. The rows that have matches in right dataset are enriched with appropriated information while the ones without the matches, have this information set to null:

"left outer join" should "collect all rows from the left dataset" in {
  val ordersByCustomer = orders
    .join(customers, orders("customers_id") === customers("id"), "leftouter")
    .map(Mapper.mapJoinedRow(_))
    .collect()

  ordersByCustomer.size shouldEqual(5)
  ordersByCustomer should contain allOf("1,1,50.0,1,Customer_1", "2,2,10.0,2,Customer_2", "3,2,10.0,2,Customer_2",
    "4,2,10.0,2,Customer_2", "5,1000,19.0,null,null")
}

RIGHT OUTER JOIN

The right outer join is a variation of left outer join. It matches all data from right dataset to entries from the left dataset - even if some of matches are missing:

"right outer join" should "collect all rows from the right dataset" in {
  val ordersByCustomer = orders
    .join(customers, orders("customers_id") === customers("id"), "rightouter")
    .map(Mapper.mapJoinedRow(_))
    .collect()

  ordersByCustomer.size shouldEqual(5)
  ordersByCustomer should contain allOf("null,null,null,3,Customer_3",
    "1,1,50.0,1,Customer_1", "2,2,10.0,2,Customer_2", "3,2,10.0,2,Customer_2",
    "4,2,10.0,2,Customer_2")
}

FULL OUTER JOIN

The full outer join is the solution when we need to match all corresponding data and include the rows without matches from both datasets. In other words, it contains all, eventually merged, data:

"full outer join" should "collect all rows from both datasets" in {
  val ordersByCustomer = orders
    .join(customers, orders("customers_id") === customers("id"), "fullouter")
    .map(Mapper.mapJoinedRow(_))
    .collect()

  ordersByCustomer.size shouldEqual(6)
  ordersByCustomer should contain allOf("null,null,null,3,Customer_3",
    "1,1,50.0,1,Customer_1", "2,2,10.0,2,Customer_2", "3,2,10.0,2,Customer_2",
    "4,2,10.0,2,Customer_2", "5,1000,19.0,null,null")
}

LEFT SEMI JOIN

When the left semi join is used, all rows from the left dataset having their correspondence in the right dataset are returned in the final result. However, unlike left outer join, the result doesn't contain merged data from both datasets. Instead, it contains only the information (columns) brought by the left dataset:

"left semi join" should "return only the matching rows with left table columns exclusively" in {
  val ordersByCustomer = orders
    .join(customers, orders("customers_id") === customers("id"), "leftsemi")
    .map(Mapper.halfMapJoinedRow(_))
    .collect()

  ordersByCustomer.size shouldEqual(4)
  ordersByCustomer should contain allOf("1,1,50.0", "2,2,10.0", "3,2,10.0", "4,2,10.0")
}

LEFT ANTI JOIN

The last described type is left anti join. It takes all rows from the left dataset that don't have their matching in the right dataset. In our case, it'll return only the row with Order#5:

"left anti join" should "return only left dataset information without matching" in {
  val ordersByCustomer = orders
    .join(customers, orders("customers_id") === customers("id"), "leftanti")
    .map(Mapper.halfMapJoinedRow(_))
    .collect()

  ordersByCustomer.size shouldEqual(1)
  ordersByCustomer(0) shouldEqual("5,1000,19.0")
}

Spark SQL offers plenty of possibilities to join datasets. Some of them, as inner, left semi and left anti join, are strict and help to limit the size of joined datasets. The others are more permissive since they return more data - either all from one side with matching rows or every row eventually matching. But as we could see in the second section, Spark SQL also has a built-in protection against the most dangerous join in terms of performance - cross join. To use it, the programmer must enable it explicitly.

Share, like or comment this post on Twitter:

Share on: