Writing Apache Spark SQL custom logical optimization - unsupported optimization hints

on waitingforcode.com

Writing Apache Spark SQL custom logical optimization - unsupported optimization hints

After 2 previous posts dedicated to custom optimization in Apache Spark SQL, it's a good moment to start to write the code. As Jacek Laskowski suggested on Twitter (link in Read more), I will try to implement one extra optimization hint. But first things first and let's start with hints definition.

Since I've never used hints before, even for RDBMS, this post will be an introduction for what I will try to achieve in the next post. It starts by hints definition in the first section. The second part shows the implementation of already implemented hints in Apache Spark. The next one gives some implementation details of them.

When you know your data better than optimizer

In databases, a hint is something that can change the decisions made by the optimizer. For example, if you are making a UNION of 2 datasets, you may tell the optimizer to use merge join instead of plan concatenation. Also, if you are thinking that the order of JOINS does matter, you can force the optimizer to not change it.

As you can see, a great number of hints concern datasets combination. But they're not the single use cases. By analyzing SQL Server, Oracle, Hive and Impala hints, I established the following list of available user optimizations:

  • join-related hints - this category of hints tends to optimize JOIN operations. It's here where you will find broadcast hint described in the next 2 sections. In addition to it, the category includes merge, nested loops and hash hints which should be implemented in the next major version of Apache Spark.
  • fast N - the goal of this hint is to return the first N results as fast as possible.
  • grouping with ordering or hashing - with this hint you can tell the optimizer to use ordered-based grouping instead of hash-based one. The difference is that the optimizer will sort all grouped items and compute the aggregations sequentially.
  • union - when you're making a union, you can force the optimizer to use hashing or merge join instead of default concatenation operation.
  • force order - if you're sure that reordering the JOIN clauses by optimizer won't help your query, you can disable this feature and keep the order defined in the initial query.
  • reduced parallelism - with this hint you can define the maximal level of parallelism for your query.
  • index hint - if your data source supports indexes, you can indicate the indices to use. It eliminates the step where query planner figures out what indices to use in the query.

Broadcast hint in Apache Spark

As of this writing Apache Spark SQL implements only 2 hints: broadcast hint (other join hints will be added in 3.0.0 release, see SPARK-27225) and coalesce/repartition added in 2.4.0. The role of the latter ones is the same as for repartition and coalesce methods in SDK, so I will focus here on the former one.

The goal of broadcast hint is to broadcast one side of the join, even for the statistics lower than the configuration entry defined in spark.sql.autoBroadcastJoinThreshold. Of course, the hint will be applied only on the supported types of joins. Two following tests show the cases where it does and does not apply:

  "broadcast hint" should "be respected for left join" in {
    val inMemoryCustomersDataFrame = Seq(
      (1, "Customer_1")
    ).toDF("id", "login")
    val inMemoryOrdersDataFrame = Seq(
      (1, 1, 50.0d, System.currentTimeMillis()), (2, 2, 10d, System.currentTimeMillis()),
      (3, 2, 10d, System.currentTimeMillis()), (4, 2, 10d, System.currentTimeMillis())
    ).toDF("id", "customers_id", "amount", "date")

    val ordersByCustomer = functions.broadcast(inMemoryCustomersDataFrame)
      .join(inMemoryOrdersDataFrame, inMemoryOrdersDataFrame("customers_id") === inMemoryCustomersDataFrame("id"),
        "left")

    val queryExecution = ordersByCustomer.queryExecution.toString()
    queryExecution should include("ResolvedHint (broadcast)")
    ordersByCustomer.queryExecution.executedPlan.toString() should include("*(1) BroadcastHashJoin [id#5], [customers_id#19], LeftOuter, BuildRight")
  }

  "broadcast hint" should "not be respected for not supported join type" in {
    val inMemoryCustomersDataFrame = Seq(
      (1, "Customer_1")
    ).toDF("id", "login")
    val inMemoryOrdersDataFrame = Seq(
      (1, 1, 50.0d, System.currentTimeMillis()), (2, 2, 10d, System.currentTimeMillis()),
      (3, 2, 10d, System.currentTimeMillis()), (4, 2, 10d, System.currentTimeMillis())
    ).toDF("id", "customers_id", "amount", "date")

    val ordersByCustomer = functions.broadcast(inMemoryCustomersDataFrame)
      .join(inMemoryOrdersDataFrame, inMemoryOrdersDataFrame("customers_id") === inMemoryCustomersDataFrame("id"),
        "full_outer")

    val queryExecution = ordersByCustomer.queryExecution.toString()
    // The hint is still resolved at logical plan
    queryExecution should include("ResolvedHint (broadcast)")
    // But it's not respected at physical plan
    ordersByCustomer.queryExecution.executedPlan.toString() should not include("*(1) BroadcastHashJoin [id#5], [customers_id#19], LeftOuter, BuildRight")
    ordersByCustomer.queryExecution.executedPlan.toString() should include("SortMergeJoin [id#5], [customers_id#19], FullOuter")
  }
     

The examples above used Scala's API but you can also give the hints in plain SQL, as it was the case in Hive. Apache Spark SQL supports one of BROADCAST, BROADCASTJOIN and MAPJOIN expressions:

  "broadcast hint" should "be defined with SQL expression" in {
    Seq(
      (1, "Customer_1")
    ).toDF("id", "login").createTempView("customers")
    Seq(
      (1, 1, 50.0d, System.currentTimeMillis()), (2, 2, 10d, System.currentTimeMillis()),
      (3, 2, 10d, System.currentTimeMillis()), (4, 2, 10d, System.currentTimeMillis())
    ).toDF("id", "customers_id", "amount", "date").createTempView("orders")

    val query = testSession.sql("SELECT /*+ BROADCAST(c) */ * FROM customers c JOIN orders o ON o.customers_id = c.id")

    val queryExecution = query.queryExecution.toString()
    queryExecution should include("ResolvedHint (broadcast)")
    query.queryExecution.executedPlan.toString() should include("*(1) BroadcastHashJoin [id#5], [customers_id#19], Inner, BuildLeft")
  }

Always the dataset wrapped with broadcast function will be the candidate for broadcasting. In my examples, I used customers dataset since I suppose to have fewer consumers than orders.

Broadcast hint implementation

Broadcast hints implementation is not hidden. A simple search for "BroadcastHint" in IntelliJ leads directly to org.apache.spark.sql.catalyst.analysis.ResolveHints object defining broadcast hint currently implemented as ResolveBroadcastHints and ResolveCoalesceHints rules. The idea behind these optimization rules is straightforward - look for broadcast aliases and transform them into ResolvedHint nodes:

   def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
      case h: UnresolvedHint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
        if (h.parameters.isEmpty) {
          // If there is no table alias specified, turn the entire subtree into a BroadcastHint.
          ResolvedHint(h.child, HintInfo(broadcast = true))

The hints are later used by query planner to broadcast one of join sides:

    case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
        if canBroadcastByHints(joinType, left, right) =>
        val buildSide = broadcastSideByHints(joinType, left, right)

    private def canBroadcastByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan)
      : Boolean = {
      val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast
      val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast
      buildLeft || buildRight
    }

    private def broadcastSideByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan)
      : BuildSide = {
      val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast
      val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast
      broadcastSide(buildLeft, buildRight, left, right)
    }

As an exercise, in the next post I will try to implement one of the previously listed hints. Initially, I thought about remaining JOIN but since they're already implemented, I will try to pick up one of the lists defined in the first section.

Read also about Writing Apache Spark SQL custom logical optimization - unsupported optimization hints here: Jacek Laskowski Twitter comment , Controlling Execution Plans with Hints , Oracle Hints , Broadcast Hint for SQL Queries , LanguageManual JoinOptimization .

Share, like or comment this post on Twitter:

Share on: