Join hints in Apache Spark SQL

Versions: Apache Spark 3.1.1

With the Adaptive Query Execution module, you can have a feeling that Apache Spark will optimize the job for you. In part, yes, because it'll be able to optimize the job based on the runtime parameters you don't necessarily know. However, you also can master the execution, and ones of these mastery tools are hints.

Definition

Hints aren't an Apache Spark invention. If you have been working with backend technologies, you might have used them with RDBMS to specify a locking method or the active index in the query.

A hints is a way to override the behavior of the query optimizer and to force it to use a specific join strategy or an index. However, since query optimizers are usually very smart components, using hints will not necessarily be the first thing you will do when working with a database. You should use them carefully and when you really know what the hint will bring to the query. Also, whenever you think about using hints, try to understand why the query optimizer chooses a worse execution plan in your eyes. Maybe it hides another problem like outdated statistics?

But anyway, let's come back to Apache Spark SQL and see how to drive the framework behavior with join hints.

Resolution stage

I will start with an interesting fact: join hints are not only the client-facing feature. If you verify the implementation of broadcast join method, you will see that Apache Spark also uses them under-the-hood:

object functions {
  def broadcast[T](df: Dataset[T]): Dataset[T] = {
    Dataset[T](df.sparkSession,
      ResolvedHint(df.logicalPlan, HintInfo(strategy = Some(BROADCAST))))(df.exprEnc)
  }

ResolvedHint and HintInfo are 2 classes involved in the physical execution that I'll cover in the next section. But before it happens, there are some extra steps made by the query analyzer and optimizer. In the beginning, the analyser resolves all hint expressions in the ResolveJoinStrategyHints analysis rule:

object JoinStrategyHint {
  val strategies: Set[JoinStrategyHint] = Set(
    BROADCAST,
    SHUFFLE_MERGE,
    SHUFFLE_HASH,
    SHUFFLE_REPLICATE_NL)
}


object ResolveHints {
  object ResolveJoinStrategyHints extends Rule[LogicalPlan] {
    private val STRATEGY_HINT_NAMES = JoinStrategyHint.strategies.flatMap(_.hintAliases)

    private def createHintInfo(hintName: String): HintInfo = {
      HintInfo(strategy =
        JoinStrategyHint.strategies.find(
          _.hintAliases.map(
            _.toUpperCase(Locale.ROOT)).contains(hintName.toUpperCase(Locale.ROOT))))
    }
// ...
    def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
      case h: UnresolvedHint if STRATEGY_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
// ...

As you can see, the analyzer translates the unresolved hints to the resolved hints by looking for their names in the hintAliases field of every hint strategy. After this step, the query optimizer will deal with a ResolvedHint node having a HintInfo attribute with the hint strategy name. But it's only the output for the analysis stage:

== Analyzed Logical Plan ==
loginShop1: string, id: int, loginShop2: string, id: int
Join Inner, (id#8 = id#19)
:- Project [_1#2 AS loginShop1#7, _2#3 AS id#8]
:  +- LocalRelation [_1#2, _2#3]
+- ResolvedHint (strategy=merge)
   +- Project [_1#13 AS loginShop2#18, _2#14 AS id#19]
      +- LocalRelation [_1#13, _2#14]


Join execution

The query optimizer replaces the analyzed hints in the logical rule called EliminateResolvedHint. As you can see in the plan above, the "hinted" projection is wrapped with a ResolvedHint and the Join node is still there. It's important to know that before analyzing the EliminateResolvedHint rule because the elimination still relies on the join node!

object EliminateResolvedHint extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = {
    val pulledUp = plan transformUp {
      case j: Join if j.hint == JoinHint.NONE =>
        val (newLeft, leftHints) = extractHintsFromPlan(j.left)
        val (newRight, rightHints) = extractHintsFromPlan(j.right)
        val newJoinHint = JoinHint(mergeHints(leftHints), mergeHints(rightHints))
        j.copy(left = newLeft, right = newRight, hint = newJoinHint)
    }

As you can notice, the optimizer extracts all hints from both sides of the join and returns the "hinted" node with all the hint information:

  private[sql] def extractHintsFromPlan(plan: LogicalPlan): (LogicalPlan, Seq[HintInfo]) = {
    plan match {
      case h: ResolvedHint =>
        val (plan, hints) = extractHintsFromPlan(h.child)
        (plan, h.hints +: hints)
      case u: UnaryNode =>
        val (plan, hints) = extractHintsFromPlan(u.child)
        (u.withNewChildren(Seq(plan)), hints)
      // TODO revisit this logic:
      // except and intersect are semi/anti-joins which won't return more data then
      // their left argument, so the broadcast hint should be propagated here
      case i: Intersect =>
        val (plan, hints) = extractHintsFromPlan(i.left)
        (i.copy(left = plan), hints)
      case e: Except =>
        val (plan, hints) = extractHintsFromPlan(e.left)
        (e.copy(left = plan), hints)
      case p: LogicalPlan => (p, Seq.empty)
    }
  }

After terminating this recursive call, the val (newLeft, leftHints) and val (newRight, rightHints) represent the plan with all hints for the given side of the query. In the next step, the optimizer merges hints from both sides and creates a new Join node with them.

Below you can find the output of this step for this code usersFromShop1.hint("SHUFFLE_MERGE").join(usersFromShop2.hint("SHUFFLE_MERGE"), usersFromShop1("id") === usersFromShop2("id")) involving 2 hints:

== Optimized Logical Plan ==
Join Inner, (id#8 = id#19), leftHint=(strategy=merge), rightHint=(strategy=merge)
:- LocalRelation [loginShop1#7, id#8]
+- LocalRelation [loginShop2#18, id#19]

Such plan transforms later to this physical execution:

== Physical Plan ==
*(3) SortMergeJoin [id#8], [id#19], Inner
:- *(1) Sort [id#8 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(id#8, 200), ENSURE_REQUIREMENTS, [id=#12]
:     +- LocalTableScan [loginShop1#7, id#8]
+- *(2) Sort [id#19 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#19, 200), ENSURE_REQUIREMENTS, [id=#13]
      +- LocalTableScan [loginShop2#18, id#19]

Let's see what kind of hints we can use and how they translate from the analyzed to the physical plans.

Join hints

You can use join hints from SQL or the API level. Below you can find API-based examples and the plans generated for each of join hint types.

Broadcast join

    val joinedUsersBroadcastHint = usersFromShop1.join(usersFromShop2.hint("BROADCAST"),
      usersFromShop1("id") === usersFromShop2("id"))
    joinedUsersBroadcastHint.explain(true)
== Analyzed Logical Plan ==
loginShop1: string, id: int, loginShop2: string, id: int
Join Inner, (id#8 = id#19)
:- Project [_1#2 AS loginShop1#7, _2#3 AS id#8]
:  +- LocalRelation [_1#2, _2#3]
+- ResolvedHint (strategy=broadcast)
   +- Project [_1#13 AS loginShop2#18, _2#14 AS id#19]
      +- LocalRelation [_1#13, _2#14]

== Optimized Logical Plan ==
Join Inner, (id#8 = id#19), rightHint=(strategy=broadcast)
:- LocalRelation [loginShop1#7, id#8]
+- LocalRelation [loginShop2#18, id#19]

== Physical Plan ==
*(1) BroadcastHashJoin [id#8], [id#19], Inner, BuildRight, false
:- *(1) LocalTableScan [loginShop1#7, id#8]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[1, int, false] as bigint)),false), [id=#11]
   +- LocalTableScan [loginShop2#18, id#19]

Shuffle merge join

    val joinedUsersShuffleMergeJoinHint = usersFromShop1.join(usersFromShop2.hint("SHUFFLE_MERGE"),
      usersFromShop1("id") === usersFromShop2("id"))
    joinedUsersShuffleMergeJoinHint.explain(true)
== Analyzed Logical Plan ==
loginShop1: string, id: int, loginShop2: string, id: int
Join Inner, (id#8 = id#19)
:- Project [_1#2 AS loginShop1#7, _2#3 AS id#8]
:  +- LocalRelation [_1#2, _2#3]
+- ResolvedHint (strategy=merge)
   +- Project [_1#13 AS loginShop2#18, _2#14 AS id#19]
      +- LocalRelation [_1#13, _2#14]

== Optimized Logical Plan ==
Join Inner, (id#8 = id#19), rightHint=(strategy=merge)
:- LocalRelation [loginShop1#7, id#8]
+- LocalRelation [loginShop2#18, id#19]

== Physical Plan ==
*(3) SortMergeJoin [id#8], [id#19], Inner
:- *(1) Sort [id#8 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(id#8, 200), ENSURE_REQUIREMENTS, [id=#12]
:     +- LocalTableScan [loginShop1#7, id#8]
+- *(2) Sort [id#19 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#19, 200), ENSURE_REQUIREMENTS, [id=#13]
      +- LocalTableScan [loginShop2#18, id#19]

Shuffle hash join

    val joinedUsersShuffleHashJoinHint = usersFromShop1.join(usersFromShop2.hint("SHUFFLE_HASH"),
      usersFromShop1("id") === usersFromShop2("id"))
    joinedUsersShuffleHashJoinHint.explain(true)
== Analyzed Logical Plan ==
loginShop1: string, id: int, loginShop2: string, id: int
Join Inner, (id#8 = id#19)
:- Project [_1#2 AS loginShop1#7, _2#3 AS id#8]
:  +- LocalRelation [_1#2, _2#3]
+- ResolvedHint (strategy=shuffle_hash)
   +- Project [_1#13 AS loginShop2#18, _2#14 AS id#19]
      +- LocalRelation [_1#13, _2#14]

== Optimized Logical Plan ==
Join Inner, (id#8 = id#19), rightHint=(strategy=shuffle_hash)
:- LocalRelation [loginShop1#7, id#8]
+- LocalRelation [loginShop2#18, id#19]

== Physical Plan ==
*(1) ShuffledHashJoin [id#8], [id#19], Inner, BuildRight
:- Exchange hashpartitioning(id#8, 200), ENSURE_REQUIREMENTS, [id=#12]
:  +- LocalTableScan [loginShop1#7, id#8]
+- Exchange hashpartitioning(id#19, 200), ENSURE_REQUIREMENTS, [id=#13]
   +- LocalTableScan [loginShop2#18, id#19]

Nested loop join

    val joinedUsersNestedLoopJoinHint = usersFromShop1.join(usersFromShop2.hint("SHUFFLE_REPLICATE_NL"),
      usersFromShop1("id") === usersFromShop2("id"))
    joinedUsersNestedLoopJoinHint.explain(true)
== Analyzed Logical Plan ==
loginShop1: string, id: int, loginShop2: string, id: int
Join Inner, (id#8 = id#19)
:- Project [_1#2 AS loginShop1#7, _2#3 AS id#8]
:  +- LocalRelation [_1#2, _2#3]
+- ResolvedHint (strategy=shuffle_replicate_nl)
   +- Project [_1#13 AS loginShop2#18, _2#14 AS id#19]
      +- LocalRelation [_1#13, _2#14]

== Optimized Logical Plan ==
Join Inner, (id#8 = id#19), rightHint=(strategy=shuffle_replicate_nl)
:- LocalRelation [loginShop1#7, id#8]
+- LocalRelation [loginShop2#18, id#19]

== Physical Plan ==
CartesianProduct (id#8 = id#19)
:- LocalTableScan [loginShop1#7, id#8]
+- LocalTableScan [loginShop2#18, id#19]

Inconsistent hints

But what happens if we use inconsistent hints, like a broadcast hash join with a shuffle merge join?

    val joinedUsersInconsistentJoinHint = usersFromShop1.hint("BROADCAST").join(usersFromShop2.hint("SHUFFLE_MERGE"),
      usersFromShop1("id") === usersFromShop2("id"))
    joinedUsersInconsistentJoinHint.explain(true)

The execution plan will use the broadcast join strategy:

== Analyzed Logical Plan ==
loginShop1: string, id: int, loginShop2: string, id: int
Join Inner, (id#8 = id#19)
:- ResolvedHint (strategy=broadcast)
:  +- Project [_1#2 AS loginShop1#7, _2#3 AS id#8]
:     +- LocalRelation [_1#2, _2#3]
+- ResolvedHint (strategy=merge)
   +- Project [_1#13 AS loginShop2#18, _2#14 AS id#19]
      +- LocalRelation [_1#13, _2#14]

== Optimized Logical Plan ==
Join Inner, (id#8 = id#19), leftHint=(strategy=broadcast), rightHint=(strategy=merge)
:- LocalRelation [loginShop1#7, id#8]
+- LocalRelation [loginShop2#18, id#19]

== Physical Plan ==
*(1) BroadcastHashJoin [id#8], [id#19], Inner, BuildLeft, false
:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[1, int, false] as bigint)),false), [id=#11]
:  +- LocalTableScan [loginShop1#7, id#8]
+- *(1) LocalTableScan [loginShop2#18, id#19]

How is it possible? Let's analyze the step I've missed on purpose, the logical plan conversion to the physical plan. The hinted query transforms to the physical query in JoinSelection strategy and there is some order. The broadcast strategy is the favorite one, and the sort-merge join is only the second choice:

  object JoinSelection extends Strategy
    with PredicateHelper
    with JoinSelectionHelper {

    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {

      case j @ ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, nonEquiCond, left, right, hint) =>
// ...
        createBroadcastHashJoin(onlyLookingAtHint = true)
          .orElse { if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None }
          .orElse(createShuffleHashJoin(onlyLookingAtHint = true))
          .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None }
          .getOrElse(createJoinWithoutHint())

As you can deduce from the snippet above, in the hints-based strategy, the strategy tries to use the broadcast join, sort-merge, shuffle and nested loop at least resort. If the query doesn't contain any hints, the strategy will simply select the best algorithm based on the dataset statistics or user preferences like spark.sql.join.preferSortMergeJoin or spark.sql.autoBroadcastJoinThreshold. It also implies that if you use hints, you're sure that they're more appropriate than Apache Spark join strategy resolution algorithm. If you have to do so, it can also be a good idea to take a step back and check whether the reason for your action cannot be implemented as a part of join management optimizations, and reach out the community with the proposal!


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!