https://github.com/bartosz25/spark-...forcode/sql/BroadcastHintTest.scala
After 2 previous posts dedicated to custom optimization in Apache Spark SQL, it's a good moment to start to write the code. As Jacek Laskowski suggested on Twitter (link in Read more), I will try to implement one extra optimization hint. But first things first and let's start with hints definition.
Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote
one on that topic! You can read it online
on the O'Reilly platform,
or get a print copy on Amazon.
I also help solve your data engineering problems 👉 contact@waitingforcode.com 📩
Since I've never used hints before, even for RDBMS, this post will be an introduction for what I will try to achieve in the next post. It starts by hints definition in the first section. The second part shows the implementation of already implemented hints in Apache Spark. The next one gives some implementation details of them.
When you know your data better than optimizer
In databases, a hint is something that can change the decisions made by the optimizer. For example, if you are making a UNION of 2 datasets, you may tell the optimizer to use merge join instead of plan concatenation. Also, if you are thinking that the order of JOINS does matter, you can force the optimizer to not change it.
As you can see, a great number of hints concern datasets combination. But they're not the single use cases. By analyzing SQL Server, Oracle, Hive and Impala hints, I established the following list of available user optimizations:
- join-related hints - this category of hints tends to optimize JOIN operations. It's here where you will find broadcast hint described in the next 2 sections. In addition to it, the category includes merge, nested loops and hash hints which should be implemented in the next major version of Apache Spark.
- fast N - the goal of this hint is to return the first N results as fast as possible.
- grouping with ordering or hashing - with this hint you can tell the optimizer to use ordered-based grouping instead of hash-based one. The difference is that the optimizer will sort all grouped items and compute the aggregations sequentially.
- union - when you're making a union, you can force the optimizer to use hashing or merge join instead of default concatenation operation.
- force order - if you're sure that reordering the JOIN clauses by optimizer won't help your query, you can disable this feature and keep the order defined in the initial query.
- reduced parallelism - with this hint you can define the maximal level of parallelism for your query.
- index hint - if your data source supports indexes, you can indicate the indices to use. It eliminates the step where query planner figures out what indices to use in the query.
Broadcast hint in Apache Spark
As of this writing Apache Spark SQL implements only 2 hints: broadcast hint (other join hints will be added in 3.0.0 release, see SPARK-27225) and coalesce/repartition added in 2.4.0. The role of the latter ones is the same as for repartition and coalesce methods in SDK, so I will focus here on the former one.
The goal of broadcast hint is to broadcast one side of the join, even for the statistics lower than the configuration entry defined in spark.sql.autoBroadcastJoinThreshold. Of course, the hint will be applied only on the supported types of joins. Two following tests show the cases where it does and does not apply:
"broadcast hint" should "be respected for left join" in { val inMemoryCustomersDataFrame = Seq( (1, "Customer_1") ).toDF("id", "login") val inMemoryOrdersDataFrame = Seq( (1, 1, 50.0d, System.currentTimeMillis()), (2, 2, 10d, System.currentTimeMillis()), (3, 2, 10d, System.currentTimeMillis()), (4, 2, 10d, System.currentTimeMillis()) ).toDF("id", "customers_id", "amount", "date") val ordersByCustomer = functions.broadcast(inMemoryCustomersDataFrame) .join(inMemoryOrdersDataFrame, inMemoryOrdersDataFrame("customers_id") === inMemoryCustomersDataFrame("id"), "left") val queryExecution = ordersByCustomer.queryExecution.toString() queryExecution should include("ResolvedHint (broadcast)") ordersByCustomer.queryExecution.executedPlan.toString() should include("*(1) BroadcastHashJoin [id#5], [customers_id#19], LeftOuter, BuildRight") } "broadcast hint" should "not be respected for not supported join type" in { val inMemoryCustomersDataFrame = Seq( (1, "Customer_1") ).toDF("id", "login") val inMemoryOrdersDataFrame = Seq( (1, 1, 50.0d, System.currentTimeMillis()), (2, 2, 10d, System.currentTimeMillis()), (3, 2, 10d, System.currentTimeMillis()), (4, 2, 10d, System.currentTimeMillis()) ).toDF("id", "customers_id", "amount", "date") val ordersByCustomer = functions.broadcast(inMemoryCustomersDataFrame) .join(inMemoryOrdersDataFrame, inMemoryOrdersDataFrame("customers_id") === inMemoryCustomersDataFrame("id"), "full_outer") val queryExecution = ordersByCustomer.queryExecution.toString() // The hint is still resolved at logical plan queryExecution should include("ResolvedHint (broadcast)") // But it's not respected at physical plan ordersByCustomer.queryExecution.executedPlan.toString() should not include("*(1) BroadcastHashJoin [id#5], [customers_id#19], LeftOuter, BuildRight") ordersByCustomer.queryExecution.executedPlan.toString() should include("SortMergeJoin [id#5], [customers_id#19], FullOuter") }
The examples above used Scala's API but you can also give the hints in plain SQL, as it was the case in Hive. Apache Spark SQL supports one of BROADCAST, BROADCASTJOIN and MAPJOIN expressions:
"broadcast hint" should "be defined with SQL expression" in { Seq( (1, "Customer_1") ).toDF("id", "login").createTempView("customers") Seq( (1, 1, 50.0d, System.currentTimeMillis()), (2, 2, 10d, System.currentTimeMillis()), (3, 2, 10d, System.currentTimeMillis()), (4, 2, 10d, System.currentTimeMillis()) ).toDF("id", "customers_id", "amount", "date").createTempView("orders") val query = testSession.sql("SELECT /*+ BROADCAST(c) */ * FROM customers c JOIN orders o ON o.customers_id = c.id") val queryExecution = query.queryExecution.toString() queryExecution should include("ResolvedHint (broadcast)") query.queryExecution.executedPlan.toString() should include("*(1) BroadcastHashJoin [id#5], [customers_id#19], Inner, BuildLeft") }
Always the dataset wrapped with broadcast function will be the candidate for broadcasting. In my examples, I used customers dataset since I suppose to have fewer consumers than orders.
Broadcast hint implementation
Broadcast hints implementation is not hidden. A simple search for "BroadcastHint" in IntelliJ leads directly to org.apache.spark.sql.catalyst.analysis.ResolveHints object defining broadcast hint currently implemented as ResolveBroadcastHints and ResolveCoalesceHints rules. The idea behind these optimization rules is straightforward - look for broadcast aliases and transform them into ResolvedHint nodes:
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp { case h: UnresolvedHint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) => if (h.parameters.isEmpty) { // If there is no table alias specified, turn the entire subtree into a BroadcastHint. ResolvedHint(h.child, HintInfo(broadcast = true))
The hints are later used by query planner to broadcast one of join sides:
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) if canBroadcastByHints(joinType, left, right) => val buildSide = broadcastSideByHints(joinType, left, right) private def canBroadcastByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan) : Boolean = { val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast buildLeft || buildRight } private def broadcastSideByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan) : BuildSide = { val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast broadcastSide(buildLeft, buildRight, left, right) }
As an exercise, in the next post I will try to implement one of the previously listed hints. Initially, I thought about remaining JOIN but since they're already implemented, I will try to pick up one of the lists defined in the first section.
Consulting

With nearly 16 years of experience, including 8 as data engineer, I offer expert consulting to design and optimize scalable data solutions.
As an O’Reilly author, Data+AI Summit speaker, and blogger, I bring cutting-edge insights to modernize infrastructure, build robust pipelines, and
drive data-driven decision-making. Let's transform your data challenges into opportunities—reach out to elevate your data engineering game today!
👉 contact@waitingforcode.com
đź”— past projects