What's new in Apache Spark 3.1 - join evolutions

Versions: Apache Spark 3.1.1

I have waited for writing this blog post since the Data+AI Summit 2020, where Cheng Su presented the ongoing effort to improve shuffle and stream-to-stream joins in Apache Spark 3.1. And in this blog post, I will start by sharing what changed for the joins in the new release of the framework!

Shuffle hash join

The first major evolution is about shuffle-hash join strategy. No worries if you consciously use the sort-merge strategy because it's still the default one used in Apache Spark. But if you want to try an alternative version and go with the shuffle-hash, join; you can. Why do so? This shuffle-based strategy uses less CPU and I/O since it doesn't require the sort step. However, it has a bigger risk of producing OOM errors because of the lack of a fallback mechanism if the memory becomes full. The fallback mechanism is still under discussion in SPARK-32634. The idea of this implementation is to use a similar mechanism to the aggregations; i.e., monitor the size of the join side's relation and fallback to the sort-merge join whenever it reaches the max allowed threshold.

But let's leave aside the in progress work and focus on the implemented features. The first of them is the elimination of redundant shuffle step if the join is followed by a group by key expression using the same keys. To see that, let's compare the plans for this operation generated by Apache Spark 3.1.1 and Apache Spark 3.0.1:

Another major announcement regarding shuffle-based joins is the support for full outer join type, so far never executed with this strategy. How does it work? You can use the shuffle_join hint...

  val usersFromShop1 = (0 to 10).map(nr => (s"User#${nr}", nr)).toDF("loginShop1", "id")
  val usersFromShop2 = (4 to 10).map(nr => (s"User#${nr}", nr)).toDF("loginShop2", "id")

  val groupedUsers = usersFromShop1.join(usersFromShop2.hint("shuffle_hash"), usersFromShop1("id") === usersFromShop2("id"),
  "full_outer")

...or let Apache Spark decide. The engine uses 2 criteria to make a decision on using shuffle-hash join. The first of them is the possibility to build the hash map used in the join. In other terms, it checks whether the processed data is smaller than the spark.sql.autoBroadcastJoinThreshold multiplied by the number of shuffle partitions (= shuffle is more attractive than broadcast). The second criteria is based on the comparison between both join sides. If one side is at least 3 times smaller than another, Apache Spark considers it as eligible to build the hash map. You will find these 2 criteria in getShuffleHashJoinBuildSide function:

  private def canBuildLocalHashMapBySize(plan: LogicalPlan, conf: SQLConf): Boolean = {
    plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
  }
  private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
    a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes
  }
def getShuffleHashJoinBuildSide(
      left: LogicalPlan,
      right: LogicalPlan,
      joinType: JoinType,
      hint: JoinHint,
      hintOnly: Boolean,
      conf: SQLConf): Option[BuildSide] = {
    val buildLeft = if (hintOnly) {
      hintToShuffleHashJoinLeft(hint)
    } else {
      canBuildLocalHashMapBySize(left, conf) && muchSmaller(left, right)
    }
    val buildRight = if (hintOnly) {
      hintToShuffleHashJoinRight(hint)
    } else {
      canBuildLocalHashMapBySize(right, conf) && muchSmaller(right, left)
    }
    getBuildSide(
      canBuildShuffledHashJoinLeft(joinType) && buildLeft,
      canBuildShuffledHashJoinRight(joinType) && buildRight,
      left,
      right
    )
  }

It's worth mentioning that depending on which side is smaller, you will either see ShuffledHashJoin ... BuildLeft or ShuffledHashJoin ... BuildRight after explaining the plan. What does it mean? To perform the full outer join, Apache Spark uses a concept of build side and stream side. During the join, it will first iterate the stream side and return either the fully or partially (only stream side) matched rows. This iteration marks all matches as "already returned" in the underlying data structure (BitSet or OpenHashSet, depending on the join keys). In the second iteration, this time on the build side - so the smaller one - it will only return the not-matched rows from the build side. Put another way, the build side is the smaller side of the join, so it uses less memory, so automatically, it's the most optimized one to store the deduplication structure to detect what matched pairs were already emitted. Below, you can find the demo of the full-outer join:

But shuffle joins are not the single join evolutions in Apache Spark 3.1.1. Another one concerns...buckets!

Coalesce bucketed tables

Yes, buckets, even if at first it sounds strange. Buckets are the building block of bucket-based joins where Apache Spark can directly load the joined data to the final partition without performing the intermediary shuffle. Before Apache Spark 3.1.1, it only worked when both sides of the join had the same number of buckets. It didn't apply when this number was different, and obviously, the join was the shuffle-based one. Now, under some conditions, Apache Spark can optimize it to the non-shuffle join:

As you can deduce from the configuration properties, this new feature coalesces the number of buckets on the bigger side, so that they match the buckets on the smaller side and hence, the join can be performed locally without shuffle. It works for sort-merge and shuffle-based join strategy. However, to prevent the OOM due to too aggressive coalescing and the deduplication in-memory structure presented in the previous section, in the case of shuffle join, the coalesce will only apply on the stream side:

  private def isCoalesceSHJStreamSide(join: ShuffledHashJoinExec, numLeftBuckets: Int,
      numRightBucket: Int, numCoalescedBuckets: Int): Boolean = {
    if (numCoalescedBuckets == numLeftBuckets) {
      join.buildSide != BuildRight
    } else {
      join.buildSide != BuildLeft
    }
  }
        join match {
          case j: ShuffledHashJoinExec
            // Only coalesce the buckets for shuffled hash join stream side,
            // to avoid OOM for build side.
            if isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) =>
            updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)

The coalesce optimization is a CoalesceBucketsInJoin logical rule that will verify whether all the mentioned conditions are met. If it's the case, it will copy the FileSourceScanExec node of the bigger side and set the optionalNumCoalescedBuckets property to the target number of buckets. Thanks to that, at the execution of this physical node, Apache Spark will coalesce the buckets (createBucketedReadRDD from the snippet below):

  private def createBucketedReadRDD(
      bucketSpec: BucketSpec,
      readFile: (PartitionedFile) => Iterator[InternalRow],
      selectedPartitions: Array[PartitionDirectory],
      fsRelation: HadoopFsRelation): RDD[InternalRow] = {
// ...
    val filePartitions = optionalNumCoalescedBuckets.map { numCoalescedBuckets =>
      logInfo(s"Coalescing to ${numCoalescedBuckets} buckets")
      val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % numCoalescedBuckets)
      Seq.tabulate(numCoalescedBuckets) { bucketId =>
        val partitionedFiles = coalescedBuckets.get(bucketId).map {
          _.values.flatten.toArray
        }.getOrElse(Array.empty)
        FilePartition(bucketId, partitionedFiles)
      }
    }.getOrElse {
      Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
        FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty))
      }
    }

    new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
  }

Does it work? Let's see and compare the same code snippet executed on Apache Spark 3.0.1 and 3.1.1

In the article, I presented only 2 join evolutions in Apache Spark 3.1. But they're not alone! Another important change concerns stream-to-stream joins, and you will read about it next week.