Writing Apache Spark SQL custom logical optimization - unsupported optimization hints

Versions: Apache Spark 2.4.0 https://github.com/bartosz25/spark-...forcode/sql/BroadcastHintTest.scala

After 2 previous posts dedicated to custom optimization in Apache Spark SQL, it's a good moment to start to write the code. As Jacek Laskowski suggested on Twitter (link in Read more), I will try to implement one extra optimization hint. But first things first and let's start with hints definition.

Since I've never used hints before, even for RDBMS, this post will be an introduction for what I will try to achieve in the next post. It starts by hints definition in the first section. The second part shows the implementation of already implemented hints in Apache Spark. The next one gives some implementation details of them.

When you know your data better than optimizer

In databases, a hint is something that can change the decisions made by the optimizer. For example, if you are making a UNION of 2 datasets, you may tell the optimizer to use merge join instead of plan concatenation. Also, if you are thinking that the order of JOINS does matter, you can force the optimizer to not change it.

As you can see, a great number of hints concern datasets combination. But they're not the single use cases. By analyzing SQL Server, Oracle, Hive and Impala hints, I established the following list of available user optimizations:

Broadcast hint in Apache Spark

As of this writing Apache Spark SQL implements only 2 hints: broadcast hint (other join hints will be added in 3.0.0 release, see SPARK-27225) and coalesce/repartition added in 2.4.0. The role of the latter ones is the same as for repartition and coalesce methods in SDK, so I will focus here on the former one.

The goal of broadcast hint is to broadcast one side of the join, even for the statistics lower than the configuration entry defined in spark.sql.autoBroadcastJoinThreshold. Of course, the hint will be applied only on the supported types of joins. Two following tests show the cases where it does and does not apply:

  "broadcast hint" should "be respected for left join" in {
    val inMemoryCustomersDataFrame = Seq(
      (1, "Customer_1")
    ).toDF("id", "login")
    val inMemoryOrdersDataFrame = Seq(
      (1, 1, 50.0d, System.currentTimeMillis()), (2, 2, 10d, System.currentTimeMillis()),
      (3, 2, 10d, System.currentTimeMillis()), (4, 2, 10d, System.currentTimeMillis())
    ).toDF("id", "customers_id", "amount", "date")

    val ordersByCustomer = functions.broadcast(inMemoryCustomersDataFrame)
      .join(inMemoryOrdersDataFrame, inMemoryOrdersDataFrame("customers_id") === inMemoryCustomersDataFrame("id"),
        "left")

    val queryExecution = ordersByCustomer.queryExecution.toString()
    queryExecution should include("ResolvedHint (broadcast)")
    ordersByCustomer.queryExecution.executedPlan.toString() should include("*(1) BroadcastHashJoin [id#5], [customers_id#19], LeftOuter, BuildRight")
  }

  "broadcast hint" should "not be respected for not supported join type" in {
    val inMemoryCustomersDataFrame = Seq(
      (1, "Customer_1")
    ).toDF("id", "login")
    val inMemoryOrdersDataFrame = Seq(
      (1, 1, 50.0d, System.currentTimeMillis()), (2, 2, 10d, System.currentTimeMillis()),
      (3, 2, 10d, System.currentTimeMillis()), (4, 2, 10d, System.currentTimeMillis())
    ).toDF("id", "customers_id", "amount", "date")

    val ordersByCustomer = functions.broadcast(inMemoryCustomersDataFrame)
      .join(inMemoryOrdersDataFrame, inMemoryOrdersDataFrame("customers_id") === inMemoryCustomersDataFrame("id"),
        "full_outer")

    val queryExecution = ordersByCustomer.queryExecution.toString()
    // The hint is still resolved at logical plan
    queryExecution should include("ResolvedHint (broadcast)")
    // But it's not respected at physical plan
    ordersByCustomer.queryExecution.executedPlan.toString() should not include("*(1) BroadcastHashJoin [id#5], [customers_id#19], LeftOuter, BuildRight")
    ordersByCustomer.queryExecution.executedPlan.toString() should include("SortMergeJoin [id#5], [customers_id#19], FullOuter")
  }
     

The examples above used Scala's API but you can also give the hints in plain SQL, as it was the case in Hive. Apache Spark SQL supports one of BROADCAST, BROADCASTJOIN and MAPJOIN expressions:

  "broadcast hint" should "be defined with SQL expression" in {
    Seq(
      (1, "Customer_1")
    ).toDF("id", "login").createTempView("customers")
    Seq(
      (1, 1, 50.0d, System.currentTimeMillis()), (2, 2, 10d, System.currentTimeMillis()),
      (3, 2, 10d, System.currentTimeMillis()), (4, 2, 10d, System.currentTimeMillis())
    ).toDF("id", "customers_id", "amount", "date").createTempView("orders")

    val query = testSession.sql("SELECT /*+ BROADCAST(c) */ * FROM customers c JOIN orders o ON o.customers_id = c.id")

    val queryExecution = query.queryExecution.toString()
    queryExecution should include("ResolvedHint (broadcast)")
    query.queryExecution.executedPlan.toString() should include("*(1) BroadcastHashJoin [id#5], [customers_id#19], Inner, BuildLeft")
  }

Always the dataset wrapped with broadcast function will be the candidate for broadcasting. In my examples, I used customers dataset since I suppose to have fewer consumers than orders.

Broadcast hint implementation

Broadcast hints implementation is not hidden. A simple search for "BroadcastHint" in IntelliJ leads directly to org.apache.spark.sql.catalyst.analysis.ResolveHints object defining broadcast hint currently implemented as ResolveBroadcastHints and ResolveCoalesceHints rules. The idea behind these optimization rules is straightforward - look for broadcast aliases and transform them into ResolvedHint nodes:

   def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
      case h: UnresolvedHint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
        if (h.parameters.isEmpty) {
          // If there is no table alias specified, turn the entire subtree into a BroadcastHint.
          ResolvedHint(h.child, HintInfo(broadcast = true))

The hints are later used by query planner to broadcast one of join sides:

    case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
        if canBroadcastByHints(joinType, left, right) =>
        val buildSide = broadcastSideByHints(joinType, left, right)

    private def canBroadcastByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan)
      : Boolean = {
      val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast
      val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast
      buildLeft || buildRight
    }

    private def broadcastSideByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan)
      : BuildSide = {
      val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast
      val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast
      broadcastSide(buildLeft, buildRight, left, right)
    }

As an exercise, in the next post I will try to implement one of the previously listed hints. Initially, I thought about remaining JOIN but since they're already implemented, I will try to pick up one of the lists defined in the first section.