Writing custom optimization in Apache Spark SQL - physical plan

Versions: Apache Spark 2.4.0 https://github.com/bartosz25/spark-...ions/PhysicalOptimizationTest.scala

If you follow Apache Spark SQL category on my blog, you can see a lot of posts about customizing this framework. After recently published custom logical rules, it's time to explore another part which is planner strategy.

In the first part I will shortly explain how I got there. In the next one I will focus on the part I will customize in subsequent posts whereas at the end, I will use a reverse-engineering approach to figure out the main points of physical plans, exactly as I did for logical plans in the post writing Apache Spark SQL custom logical optimization - API.

Why planner strategy?

The first custom optimization hint I tried to implement was the join preserving the order of expression. Initially I thought I could do that with simple pattern matching case like this one:

…
case join: Join => join

The problem, of course, is that the operation is not modified by the rule, so it will very likely be modified by the framework. And it was the case. So later I tried to reorder back the JOIN parts but it was a pain. Instead of that, I opted for a less painful method with a custom logical plan node, a kind of FrozenJoin that replaced the Join node of Apache Spark.

And it worked. Unless I had to implement planner strategy too in order to avoid the errors like this one:

assertion failed: No plan for FrozenJoin Inner, (key1#72 = key3#78)

Which also was painful - especially because of the complexity of already existent join executed. Nonetheless, it made me think about planner strategies on general.

Planner strategy 101

In previous posts you discovered customizations related to the logical plan, i.e. a tree defining the operations to execute. However, it's not the final format of our query. The next step consists of converting this optimized but still static plan to something that Apache Spark can execute for real. And this "something" is called physical plan and it's built upon optimized logical plan.

The transformation is performed by org.apache.spark.sql.execution.SparkPlanner instance inside plan(plan: LogicalPlan) method. I will details this part a little bit more in the next section.

To translate the logical plan, SparkPlanner uses different strategies being all the implementations of org.apache.spark.sql.execution.SparkStrategy abstract class. And it's inside the method exposing these strategies, org.apache.spark.sql.execution.SparkPlanner#strategies, where you will find your customized planner strategies. Aside from the extra ones, in the strategies list you will also retrieve a way to make physical join between datasets, all logic of reading data with pushdown predicates and pruning, or more basic transformations like scanning in-memory dataset, executing UNIONs or repartitioning.

The transformed plan is returned as an instance of org.apache.spark.sql.execution.SparkPlan which is later used to execute the computation by calling its execute() method.

Planner strategy API

Let's start with the naming conventions since their knowledge is very helpful to understand what happens underneath. The parts of the physical plan responsible for executing the code are suffixed with *Exec and they implement SparkPlan. The real execution logic is implemented in one of the methods starting with do*. Among them, you will find doProduce and doConsume. Both create Java code that once compiled will be used by the framework to process the data.

The logic inside do* function uses low-level RDD abstraction that you maybe already know from the first versions of Apache Spark. The code defined in RDD transformations can have 2 different forms:

It's also important to understand the purpose of the second do-like method, doPrepare. This function is called only once, just before the physical execution of the plan. So, it's a great place to set up everything needed in doExecute. The guarantee of being called only once makes that it's the place where asynchronous computation is made. For instance, it's the case of broadcast variables used in joins:

Planner strategy API - codegen

I described there only the first class of physical executors. You must know that the code can be executed in a different manner, through the physical plans extending CodegenSupport trait. As its name indicates, the implementations support code generation. If you look at the API, among its used methods you will find inputRDDs(), produce() and needCopyResult(). Let's start with the first one which simply generates the input rows for the operator. You will find its use everywhere the operator needs the values from its child, like in order, mapping or filtering. The second method, produce(), is also important because it's responsible for generating the Java code for the operation. Regarding needCopyResult(), it's a flag saying whether the rows of the given operator should be copied before putting them into the buffer. It happens for instance with a sort-merge join, or generally, everywhere one row may be used to produce multiple rows:

  "the result of sort-merge join" should "be copied" in {
    val sparkSession = SparkSession.builder().appName("Sort-merge join test")
      .master("local[*]")
      .config("spark.sql.join.preferSortMergeJoin", "true")
      .config("spark.sql.autoBroadcastJoinThreshold", "1")
      .config("spark.sql.defaultSizeInBytes", "100000")
      .getOrCreate()

    import sparkSession.implicits._

    val customersDataFrame = (1 to 3).map(nr => (nr, s"Customer_${nr}")).toDF("cid", "login")
    val ordersDataFrame = Seq(
      (1, 1, 19.5d), (2, 1, 200d), (3, 2, 500d), (4, 100, 1000d),
      (5, 1, 19.5d), (6, 1, 200d), (7, 2, 500d), (8, 100, 1000d)
    ).toDF("id", "customers_id", "amount")

    val ordersWithCustomers = ordersDataFrame.join(customersDataFrame, $"customers_id" === $"cid")

    val generatedCode = debug.codegenString(ordersWithCustomers.queryExecution.executedPlan)
    print(generatedCode)
    generatedCode should include("append((smj_mutableStateArray_0[0].getRow()).copy());")
  }

For the generated code like this one:

/* 104 */       while (smj_iterator_0.hasNext()) {
/* 105 */         InternalRow smj_rightRow_1 = (InternalRow) smj_iterator_0.next();
/* 106 */
/* 107 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 108 */
/* 109 */         int smj_value_7 = smj_rightRow_1.getInt(0);
/* 110 */         boolean smj_isNull_3 = smj_rightRow_1.isNullAt(1);
/* 111 */         UTF8String smj_value_8 = smj_isNull_3 ?
/* 112 */         null : (smj_rightRow_1.getUTF8String(1));
/* 113 */         smj_mutableStateArray_0[0].reset();
/* 114 */
/* 115 */         smj_mutableStateArray_0[0].zeroOutNullBytes();
/* 116 */
/* 117 */         smj_mutableStateArray_0[0].write(0, smj_value_4);
/* 118 */
/* 119 */         smj_mutableStateArray_0[0].write(1, smj_value_5);
/* 120 */
/* 121 */         smj_mutableStateArray_0[0].write(2, smj_value_6);
/* 122 */
/* 123 */         smj_mutableStateArray_0[0].write(3, smj_value_7);
/* 124 */
/* 125 */         if (smj_isNull_3) {
/* 126 */           smj_mutableStateArray_0[0].setNullAt(4);
/* 127 */         } else {
/* 128 */           smj_mutableStateArray_0[0].write(4, smj_value_8);
/* 129 */         }
/* 130 */         append((smj_mutableStateArray_0[0].getRow()).copy());
/* 131 */
/* 132 */       }

As you can see, the previous fragment iterates over all rows matched in the right dataset (/* 104*/), clears the previously matched row (/* 113 */), creates a joined row (/* 115 */ - /* 129 */) and appends its copy to final iterator (/* 130 */).

The code generation support is also important from another point of view. WholeStageCodegenExec which is responsible for merging a subtree of plans implementing CodegenSupport into a single one Java function. This feature automatically reduces overhead of processing and materializing rows between computations. You can see it in action in the following snippet:

  it should "be generated for select, filter and map" in {
    val sparkSession: SparkSession = SparkSession.builder()
      .appName("Spark SQL WholeStageCodegen enabled example").master("local[*]")
      .getOrCreate()
    import sparkSession.implicits._
    val inMemoryCustomersDataFrame = Seq(
      (1, "Customer_1", true, "xxx"), (2, "Customer_2", true, "xxx"),
      (3, "Customer_3", true, "xxx"), (4, "Customer_4", false, "xxx"),
      (5, "Customer_5", false, "xxx"), (6, "Customer_6", false, "xxx"),
      (7, "Customer_7", true, "xxx"), (8, "Customer_8", true, "xxx")
    ).toDF("id", "login", "is_active", "useless_field")
    
    val activeUsers = inMemoryCustomersDataFrame
      .select("id", "login", "is_active")
      .filter($"is_active" === true)
      .map(row => {
        s"User ${row.getAs[String]("login")} is active :-)"
      })

    val wholeStageCodeGen = debug.codegenString(activeUsers.queryExecution.executedPlan)
    wholeStageCodeGen should include("Found 1 WholeStageCodegen subtrees.")
    println(wholeStageCodeGen)
  }

  it should "not generate 1 common function with disabed code generation" in {
    val sparkSession: SparkSession = SparkSession.builder()
      .appName("Spark SQL WholeStageCodegen disabled example").master("local[*]")
      .config("spark.sql.codegen.wholeStage", false)
      .getOrCreate()
    import sparkSession.implicits._
    val inMemoryCustomersDataFrame = Seq(
      (1, "Customer_1", true, "xxx"), (2, "Customer_2", true, "xxx"),
      (3, "Customer_3", true, "xxx"), (4, "Customer_4", false, "xxx"),
      (5, "Customer_5", false, "xxx"), (6, "Customer_6", false, "xxx"),
      (7, "Customer_7", true, "xxx"), (8, "Customer_8", true, "xxx")
    ).toDF("id", "login", "is_active", "useless_field")

    val activeUsers = inMemoryCustomersDataFrame
      .select("id", "login", "is_active")
      .filter($"is_active" === true)
      .map(row => {
        s"User ${row.getAs[String]("login")} is active :-)"
      })

    val codeGen = debug.codegenString(activeUsers.queryExecution.executedPlan)
    codeGen should include("Found 0 WholeStageCodegen subtrees")
    println(codeGen)
  }

It was quite long but I hope you learned something new. After logical plan, physical plan is another possible way to introduce some custom behavior. Even though you won't need it very often, it may be the good place to start if you have some improvements proposal. It allows you to play safely, without impacting source code. And to play you can use some points described in 2 last sections about planner strategy API.