What's new in Apache Spark 3.0 - demote broadcast hash join

Versions: Apache Spark 3.0.0

It's the last part of the series about the Adaptive Query Execution in Apache Spark SQL. So far you learned about the physical plan optimizations. But they're not alone and you will see that in this blog post.

As of this writing (Spark 3.0.0), there is only one extra logical plan optimization in the AQE called demote broadcast hash join. This rule checks whether the join sides have a lot of empty partitions. If it's the case, the rule adds a no broadcast hash join hint to prevent the broadcast strategy to be applied.

What does it mean, a lot of empty partitions? First, Apache Spark uses available statistics to find all not empty partitions. Later, it checks their ratio to the total number of partitions and if the result is lower than spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin configuration entry, the sort-merge join will be preferred over the broadcast strategy:

      val mapStats = stage.mapStats.get
      val partitionCnt = mapStats.bytesByPartitionId.length
      val nonZeroCnt = mapStats.bytesByPartitionId.count(_ > 0)
      partitionCnt > 0 && nonZeroCnt > 0 &&
        (nonZeroCnt * 1.0 / partitionCnt) < conf.nonEmptyPartitionRatioForBroadcastJoin

According to the PR introducing this change, sort-merge join should perform much better than the broadcast join in this scenario. The hint is put on one or both sides of the join, so that the broadcast join can still happen if one side is eligible:

      var newHint = hint
      if (!hint.leftHint.exists(_.strategy.isDefined) && shouldDemote(left)) {
        newHint = newHint.copy(leftHint =
          Some(hint.leftHint.getOrElse(HintInfo()).copy(strategy = Some(NO_BROADCAST_HASH))))
      }
      if (!hint.rightHint.exists(_.strategy.isDefined) && shouldDemote(right)) {
        newHint = newHint.copy(rightHint =
          Some(hint.rightHint.getOrElse(HintInfo()).copy(strategy = Some(NO_BROADCAST_HASH))))
      }

How are these hints interpreted by Apache Spark later? During the physical planning, it checks whether the hint is set and if it happens, it will disqualify the "hinted" side of the join as the candidate for the broadcasting:

        def createJoinWithoutHint() = {
          createBroadcastHashJoin(
            canBroadcast(left) && !hint.leftHint.exists(_.strategy.contains(NO_BROADCAST_HASH)),
            canBroadcast(right) && !hint.rightHint.exists(_.strategy.contains(NO_BROADCAST_HASH)))
            .orElse { // ...

        def createBroadcastHashJoin(buildLeft: Boolean, buildRight: Boolean) = {
          val wantToBuildLeft = canBuildLeft(joinType) && buildLeft
          val wantToBuildRight = canBuildRight(joinType) && buildRight
          getBuildSide(wantToBuildLeft, wantToBuildRight, left, right).map { buildSide => 
// ... build the broadcast join

    private def getBuildSide(wantToBuildLeft: Boolean,wantToBuildRight: Boolean,
        left: LogicalPlan, right: LogicalPlan): Option[BuildSide] = {
      if (wantToBuildLeft && wantToBuildRight) {
        // returns the smaller side base on its estimated physical size, if we want to build the
        // both sides.
        Some(getSmallerSide(left, right))
      } else if (wantToBuildLeft) {
        Some(BuildLeft)
      } else if (wantToBuildRight) {
        Some(BuildRight)
      } else {
        None
      }
    }

Let's see now what happens when the sort-merge join is preferred over the broadcast join. To demonstrate that, let's join 2 datasets, one quite big (100 rows) and another much smaller (6 rows), with an aggressive filter strategy on the bigger one. In consequence, there will be only 1 row left. The join falls within the broadcast limit, so the broadcast hash-join could be applied. But because of that aggressive filtering strategy, and so despite the initial broadcast hash-join optimization, DemoteBroadcastHashJoin will convert the plan back to the sort-merge join. Below you can find the demo video:

And that's the last part of the Adaptive Query Execution in Apache Spark 3 series. You could see different optimizations but most of them are there to address the issues related to the shuffle, and so in both directions (input & output). You saw a rule to improve skewed joins but also the one to keep shuffle in case of a lot of empty partitions. All this to improve the queries execution time 💪