It's the last part of the series about the Adaptive Query Execution in Apache Spark SQL. So far you learned about the physical plan optimizations. But they're not alone and you will see that in this blog post.
A virtual conference at the intersection of Data and AI. This is not a conference for the hype. Its real users talking about real experiences.
- 40+ speakers with the likes of Hannes from Duck DB, Sol Rashidi, Joe Reis, Sadie St. Lawrence, Ryan Wolf from nvidia, Rebecca from lidl
- 12th September 2024
- Three simultaneous tracks
- Panels, Lighting Talks, Keynotes, Booth crawls, Roundtables and Entertainment.
- Topics include (ingestion, finops for data, data for inference (feature platforms), data for ML observability
- 100% virtual and 100% free
👉 Register here
As of this writing (Spark 3.0.0), there is only one extra logical plan optimization in the AQE called demote broadcast hash join. This rule checks whether the join sides have a lot of empty partitions. If it's the case, the rule adds a no broadcast hash join hint to prevent the broadcast strategy to be applied.
What does it mean, a lot of empty partitions? First, Apache Spark uses available statistics to find all not empty partitions. Later, it checks their ratio to the total number of partitions and if the result is lower than spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin configuration entry, the sort-merge join will be preferred over the broadcast strategy:
val mapStats = stage.mapStats.get val partitionCnt = mapStats.bytesByPartitionId.length val nonZeroCnt = mapStats.bytesByPartitionId.count(_ > 0) partitionCnt > 0 && nonZeroCnt > 0 && (nonZeroCnt * 1.0 / partitionCnt) < conf.nonEmptyPartitionRatioForBroadcastJoin
According to the PR introducing this change, sort-merge join should perform much better than the broadcast join in this scenario. The hint is put on one or both sides of the join, so that the broadcast join can still happen if one side is eligible:
var newHint = hint if (!hint.leftHint.exists(_.strategy.isDefined) && shouldDemote(left)) { newHint = newHint.copy(leftHint = Some(hint.leftHint.getOrElse(HintInfo()).copy(strategy = Some(NO_BROADCAST_HASH)))) } if (!hint.rightHint.exists(_.strategy.isDefined) && shouldDemote(right)) { newHint = newHint.copy(rightHint = Some(hint.rightHint.getOrElse(HintInfo()).copy(strategy = Some(NO_BROADCAST_HASH)))) }
How are these hints interpreted by Apache Spark later? During the physical planning, it checks whether the hint is set and if it happens, it will disqualify the "hinted" side of the join as the candidate for the broadcasting:
def createJoinWithoutHint() = { createBroadcastHashJoin( canBroadcast(left) && !hint.leftHint.exists(_.strategy.contains(NO_BROADCAST_HASH)), canBroadcast(right) && !hint.rightHint.exists(_.strategy.contains(NO_BROADCAST_HASH))) .orElse { // ... def createBroadcastHashJoin(buildLeft: Boolean, buildRight: Boolean) = { val wantToBuildLeft = canBuildLeft(joinType) && buildLeft val wantToBuildRight = canBuildRight(joinType) && buildRight getBuildSide(wantToBuildLeft, wantToBuildRight, left, right).map { buildSide => // ... build the broadcast join private def getBuildSide(wantToBuildLeft: Boolean,wantToBuildRight: Boolean, left: LogicalPlan, right: LogicalPlan): Option[BuildSide] = { if (wantToBuildLeft && wantToBuildRight) { // returns the smaller side base on its estimated physical size, if we want to build the // both sides. Some(getSmallerSide(left, right)) } else if (wantToBuildLeft) { Some(BuildLeft) } else if (wantToBuildRight) { Some(BuildRight) } else { None } }
Let's see now what happens when the sort-merge join is preferred over the broadcast join. To demonstrate that, let's join 2 datasets, one quite big (100 rows) and another much smaller (6 rows), with an aggressive filter strategy on the bigger one. In consequence, there will be only 1 row left. The join falls within the broadcast limit, so the broadcast hash-join could be applied. But because of that aggressive filtering strategy, and so despite the initial broadcast hash-join optimization, DemoteBroadcastHashJoin will convert the plan back to the sort-merge join. Below you can find the demo video:
And that's the last part of the Adaptive Query Execution in Apache Spark 3 series. You could see different optimizations but most of them are there to address the issues related to the shuffle, and so in both directions (input & output). You saw a rule to improve skewed joins but also the one to keep shuffle in case of a lot of empty partitions. All this to improve the queries execution time 💪