What's new in Apache Spark 3.0 - reuse adaptive subquery

Versions: Apache Spark 3.0.0

Apart from big and complex changes in the Adaptive Query Execution like skews or partitions coalescing, there are also some others, less complex. Although their smaller complexity, it doesn't mean they are not important. Especially when one of these changes offers a reuse of the subqueries.

New ebook 🔥

Learn 84 ways to solve common data engineering problems with cloud services.

👉 I want my Early Access edition

The post is composed of 2 short parts. In the first one, you will discover the subquery reuse in the AQE context. After that, you will see what this optimization is doing when it encounters an already resolved subquery plan.

Use case

Let's imagine a query like that. I agree, it's a complete nonsense but it's great to easily illustrate the adaptive subquery reuse:

  val subqueryRepeatedMax =
    """
      |SELECT (SELECT MAX(id5) FROM input5) AS max1, (SELECT MAX(id5) FROM input5) AS max2 FROM
      |input4 JOIN input6 ON input6.id6 = input4.id4
      |""".stripMargin
  sparkSession.sql(subqueryRepeatedMax).collect()

As you can see, the MAX aggregation is involved twice and we can avoid recomputing it twice. Let's see first how the physical plan without the subquery reuse optimization looks like:

*(3) Project [Subquery subquery#18, [id=#69] AS max1#19, Subquery subquery#20, [id=#80] AS max2#21]
:  :- Subquery subquery#18, [id=#69]
:  :  +- AdaptiveSparkPlan isFinalPlan=false
:  :     +- HashAggregate(keys=[], functions=[max(id5#10)], output=[max(id5)#23])
:  :        +- Exchange SinglePartition, true, [id=#67]
:  :           +- HashAggregate(keys=[], functions=[partial_max(id5#10)], output=[max#29])
:  :              +- LocalTableScan [id5#10]
:  +- Subquery subquery#20, [id=#80]
:     +- AdaptiveSparkPlan isFinalPlan=false
:        +- HashAggregate(keys=[], functions=[max(id5#10)], output=[max(id5)#25])
:           +- Exchange SinglePartition, true, [id=#78]
:              +- HashAggregate(keys=[], functions=[partial_max(id5#10)], output=[max#31])
:                 +- LocalTableScan [id5#10]
+- *(3) SortMergeJoin [id4#4], [id6#16], Inner
   :- *(1) Sort [id4#4 ASC NULLS FIRST], false, 0
   :  +- ShuffleQueryStage 0
   :     +- Exchange hashpartitioning(id4#4, 200), true, [id=#93]
   :        +- LocalTableScan [id4#4]
   +- *(2) Sort [id6#16 ASC NULLS FIRST], false, 0
      +- ShuffleQueryStage 1
         +- Exchange hashpartitioning(id6#16, 200), true, [id=#99]
            +- LocalTableScan [id6#16]

As you can notice, this quite long plan has 2 Subquery nodes, each one performing the same operations under-the-hood. When you enable the subquery reuse in the AQ, the same plan will be optimized to:

*(3) Project [Subquery subquery#18, [id=#69] AS max1#19, ReusedSubquery Subquery subquery#18, [id=#69] AS max2#21]
:  :- Subquery subquery#18, [id=#69]
:  :  +- AdaptiveSparkPlan isFinalPlan=false
:  :     +- HashAggregate(keys=[], functions=[max(id5#10)], output=[max(id5)#23])
:  :        +- Exchange SinglePartition, true, [id=#67]
:  :           +- HashAggregate(keys=[], functions=[partial_max(id5#10)], output=[max#29])
:  :              +- LocalTableScan [id5#10]
:  +- ReusedSubquery Subquery subquery#18, [id=#69]
+- *(3) SortMergeJoin [id4#4], [id6#16], Inner
   :- *(1) Sort [id4#4 ASC NULLS FIRST], false, 0
   :  +- ShuffleQueryStage 0
   :     +- Exchange hashpartitioning(id4#4, 200), true, [id=#93]
   :        +- LocalTableScan [id4#4]
   +- *(2) Sort [id6#16 ASC NULLS FIRST], false, 0
      +- ShuffleQueryStage 1
         +- Exchange hashpartitioning(id6#16, 200), true, [id=#99]
            +- LocalTableScan [id6#16]

As you can see, the ReusedSubquery Subquery subquery#18, [id=#69] replaced the second Subquery node and thanks to that, Apache Spark won't recompute the MAX twice. But how? Let's see that in the next section

Physical execution

The use of the adaptive subquery reuse rule is conditioned by spark.sql.execution.reuseSubquery property. When enabled, the ReuseAdaptiveSubquery performs the optimization.

The optimization rule tends to find all ExecSubqueryExpression nodes in the plan. After, the operation canonicalises the child part of the plan. In other terms, it produces a normalized/standardized form of the plan. In the context of an Apache Spark plan, this standardization consists of normalizing expression nodes like for example aliases or attribute references. The canonicalization step will try to make the plans comparable, so remove any arbitrary inferences like arbitrary generated expression id for Alias nodes.

The ReuseAdaptiveSubquery instance is created with an in-memory map reuseMap: TrieMap[SparkPlan, BaseSubqueryExec] which contains all already seen subqueries. The canonicalized plan is then used as a key in this map. If a value already exists for it, the execution node is replaced with ReusedSubqueryExec node. Otherwise, it's left as it, since it will require a physical execution:

    plan.transformAllExpressions {
      case sub: ExecSubqueryExpression =>
        val newPlan = reuseMap.getOrElseUpdate(sub.plan.canonicalized, sub.plan)
        if (newPlan.ne(sub.plan)) {
          sub.withNewPlan(ReusedSubqueryExec(newPlan))
        } else {
          sub
        }
    }

After the optimization, the plan contains a new node, ReusedSubqueryExec. When it's invoked during the physical execution, it delegates the computation to the subquery retrieved from the cache, so the subquery that already has been computed:

case class ReusedSubqueryExec(child: BaseSubqueryExec)
  extends BaseSubqueryExec with LeafExecNode {

  override def name: String = child.name
  override def output: Seq[Attribute] = child.output
  override def doCanonicalize(): SparkPlan = child.canonicalized
  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
  override def outputPartitioning: Partitioning = child.outputPartitioning

  protected override def doPrepare(): Unit = child.prepare()

  protected override def doExecute(): RDD[InternalRow] = child.execute()

  override def executeCollect(): Array[InternalRow] = child.executeCollect()

Let's see now the snippet from the first part in action, once again explained with debugger breakpoints.

Adaptive subquery reuse is then another smart optimization introduced with the Adaptive Query Execution feature. But, it's not the last one! In my last blog post, I will cover what else changed.