Nested loop join in Apache Spark SQL

on waitingforcode.com

Nested loop join in Apache Spark SQL

In programming a simple is often the synonymous of understandable and maintainable. However it doesn't always mean efficient. One of examples of this thesis is nested loop join that is also present in Apache Spark SQL.

This post through 3 sections tries to explain nested loop join. The first part introduces this join algorithm from its vendor-independent point of view. The second section shows its use in Apache Spark SQL. The final one gives some examples of this method in some learning tests.

Nested loop join

Nested loop join is often defined as the most basic joining algorithm in SQL. In pseudo-code it can be implemented as:

O_SET  # outer query set
I_SET # inner query set
PREDICATE # join predicate

foreach o_row in O_SET:
  foreach i_row in I_SET:
    if i_row matches PREDICATE:
      return (o_row, i_row)

As you can see, the algorithm iterates first through all rows from one table (O_SET) and for each of them it iterates later through all rows from another table (I_SET). If one of the inner rows matches the join predicate, it's returned together with the corresponding outer row in the result table. Presented like that we can be pretty sure that the nested loop join will perform poorly for the most of queries. Fortunately, it's rarely chosen as a default join algorithm and most of the time it's executed only for very small datasets.

BroadcastNestedLoopJoinExec

One of the places where nested loop join is used independently on the dataset size is cross join resulting on cartesian product. In this situation each row from the left table is returned together with every row from the right table, if there is no predicate defined. Apache Spark provides a support for such type of queries with org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec physical operator. It's used when neither broadcast hash join nor shuffled hash join nor sort merge join can be used to execute the join statement, as shown in org.apache.spark.sql.execution.SparkStrategies.JoinSelection#apply(plan: LogicalPlan) method:

// --- Without joining keys ------------------------------------------------------------

// Pick BroadcastNestedLoopJoin if one side could be broadcast
case j @ logical.Join(left, right, joinType, condition)
    if canBroadcastByHints(joinType, left, right) =>
  val buildSide = broadcastSideByHints(joinType, left, right)
  joins.BroadcastNestedLoopJoinExec(
    planLater(left), planLater(right), buildSide, joinType, condition) :: Nil

case j @ logical.Join(left, right, joinType, condition)
    if canBroadcastBySizes(joinType, left, right) =>
  val buildSide = broadcastSideBySizes(joinType, left, right)
  joins.BroadcastNestedLoopJoinExec(
    planLater(left), planLater(right), buildSide, joinType, condition) :: Nil

// Pick CartesianProduct for InnerJoin
case logical.Join(left, right, _: InnerLike, condition) =>
  joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nil

case logical.Join(left, right, joinType, condition) =>
  val buildSide = broadcastSide(
    left.stats.hints.broadcast, right.stats.hints.broadcast, left, right)
  // This join could be very slow or OOM
  joins.BroadcastNestedLoopJoinExec(
    planLater(left), planLater(right), buildSide, joinType, condition) :: Nil

As you can see, the main reason to use nested loop comes from the lack of joining keys. In such situation the query is qualified as broadcastable or not, according to the data statistics (size or broadcast hint). If neither of them is evaluated as true and the join type is inner, the query is executed with CartesianProductExec. In other cases BroadcastNestedLoopJoinExec is taken. The execution consists on detecting the join type and using a correct execution implementation among inner, outer, semi, anti or existence join. They're detailed in the post about join types in Spark SQL. Just to give a quick insight, we can analyze the implementation for left semi and left anti joins, placed as cases inside defaultJoin(relation: Broadcast[Array[InternalRow]]) method:

// LeftSemi with BuildLeft
joinType match {
  case LeftSemi =>
    assert(buildSide == BuildLeft)
    val buf: CompactBuffer[InternalRow] = new CompactBuffer()
    var i = 0
    val rel = relation.value
    while (i < rel.length) {
      if (matchedBroadcastRows.get(i)) {
        buf += rel(i).copy()
      }
      i += 1
    }
    return sparkContext.makeRDD(buf)
  case LeftAnti =>
    val notMatched: CompactBuffer[InternalRow] = new CompactBuffer()
    var i = 0
    val rel = relation.value
    while (i < rel.length) {
      if (!matchedBroadcastRows.get(i)) {
        notMatched += rel(i).copy()
      }
      i += 1
    }
    return sparkContext.makeRDD(notMatched)

As you can see, one takes the rows from the left relation having their correspondence in the right side while the other focuses on the rows without this equivalence.

Another point related to BroadcastNestedLoopJoinExec operator is the broadcast variable. One of both query sides will be broadcasted to all executors in order to generate the partition results. Spark decides which side to broadcast with broadcastSideByHints() or broadcastSideBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan) method. The choice depends on the join type and either the place where broadcast hint is placed or dataset size. If it's unable to decide on these criteria it will take smaller side. It's shown at the beginning of BroadcastNestedLoopJoinExec:

/** BuildRight means the right relation <=> the broadcast relation. */
private val (streamed, broadcast) = buildSide match {
  case BuildRight => (left, right)
  case BuildLeft => (right, left)
}
// ...

protected override def doExecute(): RDD[InternalRow] = {
  val broadcastedRelation = broadcast.executeBroadcast[Array[InternalRow]]()

  val resultRdd = (joinType, buildSide) match {
    case (_: InnerLike, _) =>
      innerJoin(broadcastedRelation)
    case (LeftOuter, BuildRight) | (RightOuter, BuildLeft) =>
      outerJoin(broadcastedRelation)
    // ..

Example of nested loop join in Spark SQL

To illustrate nested loop join the below tests take an example of dataset composed of consumers and orders:

private val SparkLocalSession = SparkSession.builder().appName("Correlated subquery test")
  .master("local[*]")
  .config("spark.sql.crossJoin.enabled", "true")
  .getOrCreate()

import SparkLocalSession.implicits._
private val CustomersDataFrame = (1 to 2).map(id => (id, s"Customer#${id}")).toDF("customerId", "customerName")
private val OrdersDataFrame = (1 to 3).map(orderId => (orderId, s"Order#${orderId}")).toDF("orderId", "orderName")

behavior of "broadcast nested loop join"

it should "be executed in cross join" in {
  val customersWithOrders = CustomersDataFrame.crossJoin(OrdersDataFrame)

  val mappedCustomersWithOrders =
    customersWithOrders.collect().map(row => s"${row.getAs[String]("customerName")};${row.getAs[String]("orderName")}")

  mappedCustomersWithOrders should have size 6
  mappedCustomersWithOrders should contain allOf("Customer#1;Order#1", "Customer#1;Order#2", "Customer#1;Order#3",
    "Customer#2;Order#1", "Customer#2;Order#2", "Customer#2;Order#3")
  // Whole physical plan looks like:
  // == Physical Plan ==
  //  BroadcastNestedLoopJoin BuildLeft, Cross
  // :- BroadcastExchange IdentityBroadcastMode
  //  :  +- LocalTableScan [customerId#5, customerName#6]
  // +- LocalTableScan [orderId#14, orderName#15]
  customersWithOrders.explain(true)
  val queryExecutionPlan = customersWithOrders.queryExecution.toString
  queryExecutionPlan should include("== Physical Plan ==\nBroadcastNestedLoopJoin BuildLeft, Cross")
}

it should "be executed in outer left join" in {
  val customersWithOrders = CustomersDataFrame.join(OrdersDataFrame, Seq.empty, "leftouter")

  val mappedCustomersWithOrders =
    customersWithOrders.collect().map(row => s"${row.getAs[String]("customerName")};${row.getAs[String]("orderName")}")

  mappedCustomersWithOrders should have size 6
  mappedCustomersWithOrders should contain allOf("Customer#1;Order#1", "Customer#1;Order#2", "Customer#1;Order#3",
    "Customer#2;Order#1", "Customer#2;Order#2", "Customer#2;Order#3")
  // Whole physical plan looks like:
  // == Physical Plan ==
  // BroadcastNestedLoopJoin BuildRight, LeftOuter
  // :- LocalTableScan [customerId#5, customerName#6]
  // +- BroadcastExchange IdentityBroadcastMode
  // +- LocalTableScan [orderId#14, orderName#15]
  val queryExecutionPlan = customersWithOrders.queryExecution.toString
  queryExecutionPlan should include("== Physical Plan ==\nBroadcastNestedLoopJoin BuildRight, LeftOuter")
}

it should "be executed in left semi join" in {
  val customersWithOrders = CustomersDataFrame.join(OrdersDataFrame, Seq.empty, "leftsemi")

  // Since semi-join includes only the columns from one side, the mapping here is different than in above code
  val mappedCustomersWithOrders =
    customersWithOrders.collect().map(row => s"${row.getAs[String]("customerName")}")

  mappedCustomersWithOrders should have size 2
  mappedCustomersWithOrders should contain allOf("Customer#1", "Customer#2")
  // Whole physical plan looks like:
  // == Physical Plan ==
  // BroadcastNestedLoopJoin BuildRight, LeftSemi
  // :- LocalTableScan [customerId#5, customerName#6]
  // +- BroadcastExchange IdentityBroadcastMode
  // +- LocalTableScan
  val queryExecutionPlan = customersWithOrders.queryExecution.toString
  queryExecutionPlan should include("== Physical Plan ==\nBroadcastNestedLoopJoin BuildRight, LeftSemi")
}

it should "be executed in left anti join" in {
  val customersWithOrders = CustomersDataFrame.join(OrdersDataFrame, Seq.empty, "leftanti")

  // The left anti join returns the rows from the left side without the corresponding
  // rows in the right side
  // However in this particular case (LeftSemi with BuildRight and without condition), Spark returns no rows
  // if (condition.isDefined) { // ... } else if (buildRows.nonEmpty == exists) { // ... }
  // else { Iterator.empty }
  // org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec#leftExistenceJoin
  val mappedCustomersWithOrders =
    customersWithOrders.collect().map(row => s"${row.getAs[String]("customerName")}")

  mappedCustomersWithOrders shouldBe empty
  // Whole physical plan looks like:
  // == Physical Plan ==
  // BroadcastNestedLoopJoin BuildRight, LeftAnti
  // :- LocalTableScan [customerId#5, customerName#6]
  // +- BroadcastExchange IdentityBroadcastMode
  // +- LocalTableScan
  val queryExecutionPlan = customersWithOrders.queryExecution.toString
  queryExecutionPlan should include("== Physical Plan ==\nBroadcastNestedLoopJoin BuildRight, LeftAnti")
}

it should "be executed in full outer join" in {
  val customersWithOrders = CustomersDataFrame.join(OrdersDataFrame, Seq.empty, "fullouter")

  val mappedCustomersWithOrders =
    customersWithOrders.collect().map(row => s"${row.getAs[String]("customerName")};${row.getAs[String]("orderName")}")

  mappedCustomersWithOrders should have size 6
  mappedCustomersWithOrders should contain allOf("Customer#1;Order#1", "Customer#1;Order#2", "Customer#1;Order#3",
    "Customer#2;Order#1", "Customer#2;Order#2", "Customer#2;Order#3")
  // Whole physical plan looks like:
  // == Physical Plan ==
  // BroadcastNestedLoopJoin BuildLeft, FullOuter
    // :- BroadcastExchange IdentityBroadcastMode
  //   :  +- LocalTableScan [customerId#5, customerName#6]
  // +- LocalTableScan [orderId#14, orderName#15]
  val queryExecutionPlan = customersWithOrders.queryExecution.toString
  queryExecutionPlan should include("== Physical Plan ==\nBroadcastNestedLoopJoin BuildLeft, FullOuter")
}

it should "not be executed when the size is bigger than the broadcast join threshold" in {
  val lotOfCustomersDataFrame = (1 to 440000).map(id => (id, s"Customer#${id}")).toDF("customerId", "customerName")
  val lotOfOrdersDataFrame = (1 to 440000).map(orderId => (orderId, s"Order#${orderId}")).toDF("orderId", "orderName")
  val customersWithOrders = lotOfCustomersDataFrame.crossJoin(lotOfOrdersDataFrame)
  // Here we don't collect all rows since it brings 193 600 000 000 rows that don't fit in memory
  // We only assert on the execution plan

  // Whole physical plan looks like:
  // == Physical Plan ==
  // CartesianProduct
  // :- LocalTableScan [customerId#23, customerName#24]
  // +- LocalTableScan [orderId#32, orderName#33]
  val queryExecutionPlan = customersWithOrders.queryExecution.toString
  queryExecutionPlan should include("== Physical Plan ==\nCartesianProduct")
}

Spark SQL isn't called so only because of structured data processing. It also implements a lot of concepts we can retrieve in more classical RDBMS. One of them is nested loop join where a subquery is executed for every row from the outer query. In Spark SQL is handled by the physical operator called BroadcastNestedLoopJoinExec that broadcasts appropriate side of the query to all executors and most of time returns a cartesian product.

Read also about Nested loop join in Apache Spark SQL here: Nested loop algorithms , Nested Loops Join – SQL Server Graphical Execution Plan .

Share, like or comment this post on Twitter: