In programming a simple is often the synonymous of understandable and maintainable. However it doesn't always mean efficient. One of examples of this thesis is nested loop join that is also present in Apache Spark SQL.
Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote
one on that topic! You can read it online
on the O'Reilly platform,
or get a print copy on Amazon.
I also help solve your data engineering problems 👉 contact@waitingforcode.com 📩
This post through 3 sections tries to explain nested loop join. The first part introduces this join algorithm from its vendor-independent point of view. The second section shows its use in Apache Spark SQL. The final one gives some examples of this method in some learning tests.
Nested loop join
Nested loop join is often defined as the most basic joining algorithm in SQL. In pseudo-code it can be implemented as:
O_SET # outer query set I_SET # inner query set PREDICATE # join predicate foreach o_row in O_SET: foreach i_row in I_SET: if i_row matches PREDICATE: return (o_row, i_row)
As you can see, the algorithm iterates first through all rows from one table (O_SET) and for each of them it iterates later through all rows from another table (I_SET). If one of the inner rows matches the join predicate, it's returned together with the corresponding outer row in the result table. Presented like that we can be pretty sure that the nested loop join will perform poorly for the most of queries. Fortunately, it's rarely chosen as a default join algorithm and most of the time it's executed only for very small datasets.
BroadcastNestedLoopJoinExec
One of the places where nested loop join is used independently on the dataset size is cross join resulting on cartesian product. In this situation each row from the left table is returned together with every row from the right table, if there is no predicate defined. Apache Spark provides a support for such type of queries with org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec physical operator. It's used when neither broadcast hash join nor shuffled hash join nor sort merge join can be used to execute the join statement, as shown in org.apache.spark.sql.execution.SparkStrategies.JoinSelection#apply(plan: LogicalPlan) method:
// --- Without joining keys ------------------------------------------------------------ // Pick BroadcastNestedLoopJoin if one side could be broadcast case j @ logical.Join(left, right, joinType, condition) if canBroadcastByHints(joinType, left, right) => val buildSide = broadcastSideByHints(joinType, left, right) joins.BroadcastNestedLoopJoinExec( planLater(left), planLater(right), buildSide, joinType, condition) :: Nil case j @ logical.Join(left, right, joinType, condition) if canBroadcastBySizes(joinType, left, right) => val buildSide = broadcastSideBySizes(joinType, left, right) joins.BroadcastNestedLoopJoinExec( planLater(left), planLater(right), buildSide, joinType, condition) :: Nil // Pick CartesianProduct for InnerJoin case logical.Join(left, right, _: InnerLike, condition) => joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nil case logical.Join(left, right, joinType, condition) => val buildSide = broadcastSide( left.stats.hints.broadcast, right.stats.hints.broadcast, left, right) // This join could be very slow or OOM joins.BroadcastNestedLoopJoinExec( planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
As you can see, the main reason to use nested loop comes from the lack of joining keys. In such situation the query is qualified as broadcastable or not, according to the data statistics (size or broadcast hint). If neither of them is evaluated as true and the join type is inner, the query is executed with CartesianProductExec. In other cases BroadcastNestedLoopJoinExec is taken. The execution consists on detecting the join type and using a correct execution implementation among inner, outer, semi, anti or existence join. They're detailed in the post about join types in Spark SQL. Just to give a quick insight, we can analyze the implementation for left semi and left anti joins, placed as cases inside defaultJoin(relation: Broadcast[Array[InternalRow]]) method:
// LeftSemi with BuildLeft joinType match { case LeftSemi => assert(buildSide == BuildLeft) val buf: CompactBuffer[InternalRow] = new CompactBuffer() var i = 0 val rel = relation.value while (i < rel.length) { if (matchedBroadcastRows.get(i)) { buf += rel(i).copy() } i += 1 } return sparkContext.makeRDD(buf) case LeftAnti => val notMatched: CompactBuffer[InternalRow] = new CompactBuffer() var i = 0 val rel = relation.value while (i < rel.length) { if (!matchedBroadcastRows.get(i)) { notMatched += rel(i).copy() } i += 1 } return sparkContext.makeRDD(notMatched)
As you can see, one takes the rows from the left relation having their correspondence in the right side while the other focuses on the rows without this equivalence.
Another point related to BroadcastNestedLoopJoinExec operator is the broadcast variable. One of both query sides will be broadcasted to all executors in order to generate the partition results. Spark decides which side to broadcast with broadcastSideByHints() or broadcastSideBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan) method. The choice depends on the join type and either the place where broadcast hint is placed or dataset size. If it's unable to decide on these criteria it will take smaller side. It's shown at the beginning of BroadcastNestedLoopJoinExec:
/** BuildRight means the right relation <=> the broadcast relation. */ private val (streamed, broadcast) = buildSide match { case BuildRight => (left, right) case BuildLeft => (right, left) } // ... protected override def doExecute(): RDD[InternalRow] = { val broadcastedRelation = broadcast.executeBroadcast[Array[InternalRow]]() val resultRdd = (joinType, buildSide) match { case (_: InnerLike, _) => innerJoin(broadcastedRelation) case (LeftOuter, BuildRight) | (RightOuter, BuildLeft) => outerJoin(broadcastedRelation) // ..
Example of nested loop join in Spark SQL
To illustrate nested loop join the below tests take an example of dataset composed of consumers and orders:
private val SparkLocalSession = SparkSession.builder().appName("Correlated subquery test") .master("local[*]") .config("spark.sql.crossJoin.enabled", "true") .getOrCreate() import SparkLocalSession.implicits._ private val CustomersDataFrame = (1 to 2).map(id => (id, s"Customer#${id}")).toDF("customerId", "customerName") private val OrdersDataFrame = (1 to 3).map(orderId => (orderId, s"Order#${orderId}")).toDF("orderId", "orderName") behavior of "broadcast nested loop join" it should "be executed in cross join" in { val customersWithOrders = CustomersDataFrame.crossJoin(OrdersDataFrame) val mappedCustomersWithOrders = customersWithOrders.collect().map(row => s"${row.getAs[String]("customerName")};${row.getAs[String]("orderName")}") mappedCustomersWithOrders should have size 6 mappedCustomersWithOrders should contain allOf("Customer#1;Order#1", "Customer#1;Order#2", "Customer#1;Order#3", "Customer#2;Order#1", "Customer#2;Order#2", "Customer#2;Order#3") // Whole physical plan looks like: // == Physical Plan == // BroadcastNestedLoopJoin BuildLeft, Cross // :- BroadcastExchange IdentityBroadcastMode // : +- LocalTableScan [customerId#5, customerName#6] // +- LocalTableScan [orderId#14, orderName#15] customersWithOrders.explain(true) val queryExecutionPlan = customersWithOrders.queryExecution.toString queryExecutionPlan should include("== Physical Plan ==\nBroadcastNestedLoopJoin BuildLeft, Cross") } it should "be executed in outer left join" in { val customersWithOrders = CustomersDataFrame.join(OrdersDataFrame, Seq.empty, "leftouter") val mappedCustomersWithOrders = customersWithOrders.collect().map(row => s"${row.getAs[String]("customerName")};${row.getAs[String]("orderName")}") mappedCustomersWithOrders should have size 6 mappedCustomersWithOrders should contain allOf("Customer#1;Order#1", "Customer#1;Order#2", "Customer#1;Order#3", "Customer#2;Order#1", "Customer#2;Order#2", "Customer#2;Order#3") // Whole physical plan looks like: // == Physical Plan == // BroadcastNestedLoopJoin BuildRight, LeftOuter // :- LocalTableScan [customerId#5, customerName#6] // +- BroadcastExchange IdentityBroadcastMode // +- LocalTableScan [orderId#14, orderName#15] val queryExecutionPlan = customersWithOrders.queryExecution.toString queryExecutionPlan should include("== Physical Plan ==\nBroadcastNestedLoopJoin BuildRight, LeftOuter") } it should "be executed in left semi join" in { val customersWithOrders = CustomersDataFrame.join(OrdersDataFrame, Seq.empty, "leftsemi") // Since semi-join includes only the columns from one side, the mapping here is different than in above code val mappedCustomersWithOrders = customersWithOrders.collect().map(row => s"${row.getAs[String]("customerName")}") mappedCustomersWithOrders should have size 2 mappedCustomersWithOrders should contain allOf("Customer#1", "Customer#2") // Whole physical plan looks like: // == Physical Plan == // BroadcastNestedLoopJoin BuildRight, LeftSemi // :- LocalTableScan [customerId#5, customerName#6] // +- BroadcastExchange IdentityBroadcastMode // +- LocalTableScan val queryExecutionPlan = customersWithOrders.queryExecution.toString queryExecutionPlan should include("== Physical Plan ==\nBroadcastNestedLoopJoin BuildRight, LeftSemi") } it should "be executed in left anti join" in { val customersWithOrders = CustomersDataFrame.join(OrdersDataFrame, Seq.empty, "leftanti") // The left anti join returns the rows from the left side without the corresponding // rows in the right side // However in this particular case (LeftSemi with BuildRight and without condition), Spark returns no rows // if (condition.isDefined) { // ... } else if (buildRows.nonEmpty == exists) { // ... } // else { Iterator.empty } // org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec#leftExistenceJoin val mappedCustomersWithOrders = customersWithOrders.collect().map(row => s"${row.getAs[String]("customerName")}") mappedCustomersWithOrders shouldBe empty // Whole physical plan looks like: // == Physical Plan == // BroadcastNestedLoopJoin BuildRight, LeftAnti // :- LocalTableScan [customerId#5, customerName#6] // +- BroadcastExchange IdentityBroadcastMode // +- LocalTableScan val queryExecutionPlan = customersWithOrders.queryExecution.toString queryExecutionPlan should include("== Physical Plan ==\nBroadcastNestedLoopJoin BuildRight, LeftAnti") } it should "be executed in full outer join" in { val customersWithOrders = CustomersDataFrame.join(OrdersDataFrame, Seq.empty, "fullouter") val mappedCustomersWithOrders = customersWithOrders.collect().map(row => s"${row.getAs[String]("customerName")};${row.getAs[String]("orderName")}") mappedCustomersWithOrders should have size 6 mappedCustomersWithOrders should contain allOf("Customer#1;Order#1", "Customer#1;Order#2", "Customer#1;Order#3", "Customer#2;Order#1", "Customer#2;Order#2", "Customer#2;Order#3") // Whole physical plan looks like: // == Physical Plan == // BroadcastNestedLoopJoin BuildLeft, FullOuter // :- BroadcastExchange IdentityBroadcastMode // : +- LocalTableScan [customerId#5, customerName#6] // +- LocalTableScan [orderId#14, orderName#15] val queryExecutionPlan = customersWithOrders.queryExecution.toString queryExecutionPlan should include("== Physical Plan ==\nBroadcastNestedLoopJoin BuildLeft, FullOuter") } it should "not be executed when the size is bigger than the broadcast join threshold" in { val lotOfCustomersDataFrame = (1 to 440000).map(id => (id, s"Customer#${id}")).toDF("customerId", "customerName") val lotOfOrdersDataFrame = (1 to 440000).map(orderId => (orderId, s"Order#${orderId}")).toDF("orderId", "orderName") val customersWithOrders = lotOfCustomersDataFrame.crossJoin(lotOfOrdersDataFrame) // Here we don't collect all rows since it brings 193 600 000 000 rows that don't fit in memory // We only assert on the execution plan // Whole physical plan looks like: // == Physical Plan == // CartesianProduct // :- LocalTableScan [customerId#23, customerName#24] // +- LocalTableScan [orderId#32, orderName#33] val queryExecutionPlan = customersWithOrders.queryExecution.toString queryExecutionPlan should include("== Physical Plan ==\nCartesianProduct") }
Spark SQL isn't called so only because of structured data processing. It also implements a lot of concepts we can retrieve in more classical RDBMS. One of them is nested loop join where a subquery is executed for every row from the outer query. In Spark SQL is handled by the physical operator called BroadcastNestedLoopJoinExec that broadcasts appropriate side of the query to all executors and most of time returns a cartesian product.
Consulting

With nearly 16 years of experience, including 8 as data engineer, I offer expert consulting to design and optimize scalable data solutions.
As an O’Reilly author, Data+AI Summit speaker, and blogger, I bring cutting-edge insights to modernize infrastructure, build robust pipelines, and
drive data-driven decision-making. Let's transform your data challenges into opportunities—reach out to elevate your data engineering game today!
👉 contact@waitingforcode.com
đź”— past projects