Spark SQL operator optimizations - part 1

on waitingforcode.com

Spark SQL operator optimizations - part 1

Pushdown predicate is one of the most popular optimizations in Spark SQL. But it's not the single one and their main list is defined in org.apache.spark.sql.catalyst.optimizer.Optimizer abstract class.

This post lists and describes a part of these operator optimizations from org.apache.spark.sql.catalyst.optimizer.Optimizer. Because of a big number of them, it's only the first part. It contains operators from A to L. Moreover, its format is unusual. Unlike previous posts, this one is not divided in different sections. Instead, the described operators are presented in a list where each point contains some explanation and code proofs.

All tests defined in this and the second part use the following snippet:

private val sparkSession: SparkSession = SparkSession.builder().appName("Logical Plan Optimizer test").master("local")
  .getOrCreate()

import sparkSession.implicits._
private val Users = Seq(
  (1, "user_1")
).toDF("id", "login")

val NewUsers = Seq(
  (2, "user_2"), (3, "user_3")
).toDF("id", "login")


private val UserOrders = Seq(
  (2, "order_1"), (1, "order_2")
).toDF("user_id", "order_name")

private val AllOrdersPerUser = Seq(
  (1, Array(100, 200, 300)), (2, Array(100, 300, 100)), (3, Array(9, 8, 102))
).toDF("user_id", "orders_amounts")

override def afterAll() {
  sparkSession.stop()
}

  • BooleanSimplification - simplifies some of boolean expressions by: merging them, removing NOT operator, extracting common factors or reformatting expression if its outcome can be determined without evaluating both sides.
      "redundant boolean expressions" should "be simplified" in {
        val logAppender = createLogAppender()
        val simplifiedBoolExpressionQuery = Users.select("id", "login").where("(id > 0 OR login == 'test') AND id > 0")
    
        val queryExecutionPlan = simplifiedBoolExpressionQuery.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.BooleanSimplification ===
        // Project [_1#2 AS id#5, _2#3 AS login#6]                   Project [_1#2 AS id#5, _2#3 AS login#6]
        // !+- Filter (((_1#2 > 0) || (_2#3 = test)) && (_1#2 > 0))   +- Filter (_1#2 > 0)
        // +- LocalRelation [_1#2, _2#3]                             +- LocalRelation [_1#2, _2#3]
        queryExecutionPlan should include("+- Filter (_1")
        queryExecutionPlan should include("> 0)")
        queryExecutionPlan should not include(" == 'test'")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.BooleanSimplification ===")
      }
    
      "redundant boolean expressions with negation" should "be simplified" in {
        val logAppender = createLogAppender()
        val simplifiedBoolExpressionQuery = Users.select("id", "login").where("(NOT(id > 0) OR login == 'test') AND id > 0")
    
        val queryExecutionPlan = simplifiedBoolExpressionQuery.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.BooleanSimplification ===
        //  Project [_1#2 AS id#5, _2#3 AS login#6]                       Project [_1#2 AS id#5, _2#3 AS login#6]
        // !+- Filter ((NOT (_1#2 > 0) || (_2#3 = test)) && (_1#2 > 0))   +- Filter ((_2#3 = test) && (_1#2 > 0))
        // +- LocalRelation [_1#2, _2#3]                                 +- LocalRelation [_1#2, _2#3]
        queryExecutionPlan should include(" = test)) && (_1")
        queryExecutionPlan should include("> 0))")
        queryExecutionPlan should not include("!_")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.BooleanSimplification ===")
      }
      
  • CollapseProject - merges 2 adjacent SELECT operations into a single one.
      "redundant selected columns" should "be removed in optimized plan" in {
        val logAppender = createLogAppender()
        val redundantSelectQuery = Users.select("login").select("login")
    
        val queryExecutionPlan = redundantSelectQuery.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.CollapseProject ===
        // !Project [login#6]                            Project [_2#3 AS login#6]
        // !+- Project [_1#2 AS id#5, _2#3 AS login#6]   +- LocalRelation [_1#2, _2#3]
        // !   +- LocalRelation [_1#2, _2#3]
        queryExecutionPlan should include("== Optimized Logical Plan ==\nLocalRelation [login")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.CollapseProject ===")
      }
      
  • CollapseRepartition - merges 2 repartition clauses into a single one.
      "two repartition calls" should "be transformed to a single one" in {
        val logAppender = createLogAppender()
        val redundantRepartitionOperations = Users.select("login").repartition(10).repartition(20)
    
        val queryExecutionPlan = redundantRepartitionOperations.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.CollapseRepartition ===
        // Repartition 20, true                     Repartition 20, true
        // !+- Repartition 10, true                  +- Project [login#6]
        // !   +- Project [login#6]                     +- Project [_2#3 AS login#6]
        // !      +- Project [_2#3 AS login#6]             +- LocalRelation [_1#2, _2#3]
        // !         +- LocalRelation [_1#2, _2#3]
        queryExecutionPlan should include("== Optimized Logical Plan ==\nRepartition 20, true\n+- LocalRelation [login")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.CollapseRepartition ===")
      }
    
      "two coalesce calls" should "be transformed to a single one" in {
        val logAppender = createLogAppender()
        val redundantRepartitionOperations = Users.select("login").coalesce(10).coalesce(20)
    
        val queryExecutionPlan = redundantRepartitionOperations.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.CollapseRepartition ===
        //  Repartition 20, false                    Repartition 20, false
        // !+- Repartition 10, false                 +- Project [login#6]
        // !   +- Project [login#6]                     +- Project [_2#3 AS login#6]
        // !      +- Project [_2#3 AS login#6]             +- LocalRelation [_1#2, _2#3]
        // !         +- LocalRelation [_1#2, _2#3]
        queryExecutionPlan should include("== Optimized Logical Plan ==\nRepartition 20, false\n+- LocalRelation [login#")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.CollapseRepartition ===")
      }
      
  • CollapseWindow - similar to CollapseProject but applies to windows.
  • ColumnPruning - removes the columns of data source that are not defined in the select part.
     
      "not used columns" should "be removed from the projection" in {
        val logAppender = createLogAppender()
        val notUsedColumnsQuery = Users.select("login")
    
        val queryExecutionPlan = notUsedColumnsQuery.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.ColumnPruning ===
        // Project [login#6]                            Project [login#6]
        // !+- Project [_1#2 AS id#5, _2#3 AS login#6]   +- Project [_2#3 AS login#6]
        // +- LocalRelation [_1#2, _2#3]                +- LocalRelation [_1#2, _2#3]
        queryExecutionPlan should include("== Optimized Logical Plan ==\nLocalRelation [login#")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.ColumnPruning ===")
      }
      
      "columns" should "be pruned when they are not used in the query" in {
        val logAppender = createLogAppender()
        val queryWithPrunedColumns = Users.select("id", "login").where("id > 0")
    
        val queryExecutionPlan = queryWithPrunedColumns.queryExecution.toString()
    
        // The following message should be written in logs:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.ColumnPruning ===
        // !Project [id#5, login#6]                      Project [_1#2 AS id#5, _2#3 AS login#6]
        // !+- Project [_1#2 AS id#5, _2#3 AS login#6]   +- Filter (_1#2 > 0)
        // !   +- Filter (_1#2 > 0)                         +- LocalRelation [_1#2, _2#3]
        // !      +- LocalRelation [_1#2, _2#3]
        queryExecutionPlan should include("== Optimized Logical Plan ==\nProject [_1")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.ColumnPruning ===")
      }
      
  • CombineFilters - combines 2 operations to a single one.
      "3 filters" should "be combined into a single one filter" in {
        val logAppender = createLogAppender()
        val combinedFiltersQuery = Users.select("id", "login")
          .where("id > 0").where("id < 100").where("login != 'blacklisted'")
    
        val queryExecutionPlan = combinedFiltersQuery.queryExecution.toString()
    
        // Expected message in the logs:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.CombineFilters ===
        // Project [_1#2 AS id#5, _2#3 AS login#6]                                     Project [_1#2 AS id#5, _2#3 AS login#6]
        // !+- Filter ((_1#2 < 100) && (isnotnull(_2#3) && NOT (_2#3 = blacklisted)))   +- Filter ((_1#2 > 0) && (((_1#2 < 100) && isnotnull(_2#3)) && NOT (_2#3 = blacklisted)))
        // !   +- Filter (_1#2 > 0)                                                        +- LocalRelation [_1#2, _2#3]
        // !      +- LocalRelation [_1#2, _2#3]
        // (org.apache.spark.sql.internal.BaseSessionStateBuilder$$anon$2:62)
        queryExecutionPlan should include("+- Filter ((((_1")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.CombineFilters ===")
      }
      
  • CombineLimits - merges 2 adjacent limit operators. It always takes the least value from optimized limits.
      "3 limits" should "be combined into a single one limit with the least value from 3" in {
        val logAppender = createLogAppender()
        val combinedLimitsQuery = Users.select("id", "login").limit(10).limit(2).limit(15)
    
        val queryExecutionPlan = combinedLimitsQuery.queryExecution.toString()
    
        // Expected message in the logs:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.CombineLimits ===
        // !GlobalLimit 15                                              GlobalLimit least(2, 15)
        // !+- LocalLimit 15                                            +- LocalLimit least(2, 15)
        // !   +- GlobalLimit 2                                            +- GlobalLimit 10
        // !      +- LocalLimit 2                                             +- LocalLimit 10
        // !         +- GlobalLimit 10                                           +- Project [_1#2 AS id#5, _2#3 AS login#6]
        // !            +- LocalLimit 10                                            +- LocalRelation [_1#2, _2#3]
        // !               +- Project [_1#2 AS id#5, _2#3 AS login#6]
        // !                  +- LocalRelation [_1#2, _2#3]
        queryExecutionPlan should include("== Optimized Logical Plan ==\nGlobalLimit 2\n+- LocalLimit 2")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.CombineLimits ===")
      }
      
  • CombineUnions - combines adjacent UNIONs into a single one UNION. However actually this optimization seems to be omitted since it's directly applied on Dataset#union(Dataset) method:
      def union(other: Dataset[T]): Dataset[T] = withSetOperator {
        // This breaks caching, but it's usually ok because it addresses a very specific use case:
        // using union to union many files or partitions.
        CombineUnions(Union(logicalPlan, other.logicalPlan))
      }
      
  • ConstantFolding - replaces the expressions that can be statically evaluated by their literal values. For example with this optimization the SELECT 1+2+3 AS number FROM my_table query should be transformed to SELECT 6 as number FROM my_table.
      "concatenation of 3 letters" should "be transformed to a constant" in {
        val logAppender = createLogAppender()
        val constantsFoldedQuery = sparkSession.sql("SELECT CONCAT('a', CONCAT('a', 'a')) AS tripleA")
    
        val queryExecutionPlan = constantsFoldedQuery.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.ConstantFolding ===
        // !Project [concat(a, concat(a, a)) AS tripleA#20]   Project [aaa AS tripleA#20]
        // +- OneRowRelation$                                +- OneRowRelation$
        queryExecutionPlan should include("== Optimized Logical Plan ==\nProject [aaa AS tripleA")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.ConstantFolding ===")
      }
      
  • EliminateOuterJoin - consists on replacing FULL OUTER JOIN by: INNER JOIN if both sides are filtered, LEFT OUTER JOIN if only the left side has the predicates or RIGHT OUTER JOIN if the single side with predicates are the right one.
    It also replaces LEFT OUTER JOIN and RIGHT OUTER JOIN by INNER JOIN if both sides have predicates.
     
      "full outer join" should "be replaced by inner join when a filter is applied on both sides of join" in {
        val logAppender = createLogAppender()
        val eliminatedOuterJoinQuery = Users.join(UserOrders, Users("id") === UserOrders("user_id"), "outer")
          .where("login != 'user_1' AND order_name != 'o'")
    
        val queryExecutionPlan = eliminatedOuterJoinQuery.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin ===
        // Filter (NOT (login#6 = user_1) && NOT (order_name#26 = o))    Filter (NOT (login#6 = user_1) && NOT (order_name#26 = o))
        // !+- Join FullOuter, (id#5 = user_id#25)                        +- Join Inner, (id#5 = user_id#25)
        // :- Project [_1#2 AS id#5, _2#3 AS login#6]                    :- Project [_1#2 AS id#5, _2#3 AS login#6]
        // :  +- LocalRelation [_1#2, _2#3]                              :  +- LocalRelation [_1#2, _2#3]
        // +- Project [_1#22 AS user_id#25, _2#23 AS order_name#26]      +- Project [_1#22 AS user_id#25, _2#23 AS order_name#26]
        // +- LocalRelation [_1#22, _2#23]                               +- LocalRelation [_1#22, _2#23]
        queryExecutionPlan should include("== Optimized Logical Plan ==\nJoin Inner,")
        queryExecutionPlan should include(":  +- Filter (isnotnull(_2")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin ===")
      }
    
      "full outer join" should "be replaced by right outer join when the right side has predicate" in {
        val logAppender = createLogAppender()
        val eliminatedOuterJoinQuery = Users.join(UserOrders, Users("id") === UserOrders("user_id"), "outer")
          .where("order_name != 'o'")
    
        val queryExecutionPlan = eliminatedOuterJoinQuery.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin ===
        // Filter NOT (order_name#26 = o)                                Filter NOT (order_name#26 = o)
        // !+- Join FullOuter, (id#5 = user_id#25)                        +- Join RightOuter, (id#5 = user_id#25)
        // :- Project [_1#2 AS id#5, _2#3 AS login#6]                    :- Project [_1#2 AS id#5, _2#3 AS login#6]
        // :  +- LocalRelation [_1#2, _2#3]                              :  +- LocalRelation [_1#2, _2#3]
        // +- Project [_1#22 AS user_id#25, _2#23 AS order_name#26]      +- Project [_1#22 AS user_id#25, _2#23 AS order_name#26]
        // +- LocalRelation [_1#22, _2#23]                               +- LocalRelation [_1#22, _2#23]
        queryExecutionPlan should include("== Optimized Logical Plan ==\nJoin RightOuter, (id")
        queryExecutionPlan should include("+- Filter (isnotnull(_2")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin ===")
      }
    
      "right outer join" should "be replaced inner join when both sides have predicate" in {
        val logAppender = createLogAppender()
        val eliminatedOuterJoinQuery = Users.join(UserOrders, Users("id") === UserOrders("user_id"), "rightouter")
          .where("id > 0 AND order_name != 'o'")
    
        val queryExecutionPlan = eliminatedOuterJoinQuery.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin ===
        // Filter ((id#5 > 0) && NOT (order_name#26 = o))                Filter ((id#5 > 0) && NOT (order_name#26 = o))
        // !+- Join RightOuter, (id#5 = user_id#25)                       +- Join Inner, (id#5 = user_id#25)
        // :- Project [_1#2 AS id#5, _2#3 AS login#6]                    :- Project [_1#2 AS id#5, _2#3 AS login#6]
        // :  +- LocalRelation [_1#2, _2#3]                              :  +- LocalRelation [_1#2, _2#3]
        // +- Project [_1#22 AS user_id#25, _2#23 AS order_name#26]      +- Project [_1#22 AS user_id#25, _2#23 AS order_name#26]
        //+- LocalRelation [_1#22, _2#23]                               +- LocalRelation [_1#22, _2#23]
        queryExecutionPlan should include("== Optimized Logical Plan ==\nJoin Inner, (id")
        queryExecutionPlan should include(":  +- Filter (_1")
        queryExecutionPlan should include("   +- Filter ((isnotnull(_2")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin ===")
      }
      
  • EliminateSerialization - this rule tends to optimize object manipulation. It can be especially useful to reduce the number of cyclic conversions (= back to back serialization) between the object and its serialized (InternalRow) format.
      "duplicated map operation" should "be deserialized only once" in {
        val logAppender = createLogAppender()
        val constantsFoldedQuery = Users.select("login").map(_.toString()).map(_.toString())
    
        val queryExecutionPlan = constantsFoldedQuery.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.EliminateSerialization ===
        // SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#30]            SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#30]
        //+- MapElements , class java.lang.String, [StructField(value,StringType,true)], obj#29: java.lang.String                                                        +- MapElements , class java.lang.String, [StructField(value,StringType,true)], obj#29: java.lang.String
        //  !   +- DeserializeToObject value#25.toString, obj#28: java.lang.String                                                                                                        +- Project [obj#24 AS obj#28]
        //  !      +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#25]         +- MapElements , interface org.apache.spark.sql.Row, [StructField(login,StringType,true)], obj#24: java.lang.String
        //    !         +- MapElements , interface org.apache.spark.sql.Row, [StructField(login,StringType,true)], obj#24: java.lang.String                                            +- DeserializeToObject createexternalrow(login#6.toString, StructField(login,StringType,true)), obj#23: org.apache.spark.sql.Row
        //      !            +- DeserializeToObject createexternalrow(login#6.toString, StructField(login,StringType,true)), obj#23: org.apache.spark.sql.Row                                          +- Project [_2#3 AS login#6]
        //      !               +- Project [_2#3 AS login#6]                                                                                                                                              +- LocalRelation [_1#2, _2#3]
        //      !                  +- LocalRelation [_1#2, _2#3]
        queryExecutionPlan should include("== Optimized Logical Plan ==\nSerializeFromObject [staticinvoke")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.EliminateSerialization ===")
      }
      
  • EliminateSorts - removes all sorting clauses that won't impact the order. For instance, in the following SQL query SELECT name, age FROM people ORDER BY 1 ASC, age DESC, the clause "1 ASC" will be removed. The other example is an empty ORDER BY clause.
      "empty sortable expressions" should "be removed from the order by query" in {
        val logAppender = createLogAppender()
        val queryWithNoSortableExpression = Users.sort()
    
        val queryExecutionPlan = queryWithNoSortableExpression.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.EliminateSorts ===
        // !Sort true                                    Project [_1#2 AS id#5, _2#3 AS login#6]
        // !+- Project [_1#2 AS id#5, _2#3 AS login#6]   +- LocalRelation [_1#2, _2#3]
        // !   +- LocalRelation [_1#2, _2#3]
        queryExecutionPlan should include("== Optimized Logical Plan ==\nLocalRelation [id")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.EliminateSorts ===")
      }
    
      "no-op columns used in ordering" should "be removed" in {
        val logAppender = createLogAppender()
        val queryWithNoSortableExpression = Users.withColumn("hash_number", lit(19203)).sort(asc("hash_number"))
    
        val queryExecutionPlan = queryWithNoSortableExpression.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.EliminateSorts ===
        // !Sort [19203 ASC NULLS FIRST], true                                    Project [_1#2 AS id#5, _2#3 AS login#6, 19203 AS hash_number#20]
        // !+- Project [_1#2 AS id#5, _2#3 AS login#6, 19203 AS hash_number#20]   +- LocalRelation [_1#2, _2#3]
        // !   +- LocalRelation [_1#2, _2#3]
        queryExecutionPlan should include("== Optimized Logical Plan ==\nLocalRelation [id")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.EliminateSorts ===")
      }
      
  • FoldablePropagation - foldable expressions are replaced by the computed values. For instance, in the query SELECT 123 AS number ... ORDER BY number the ordering part will be translated to AS NUMBER... ORDER BY 123.
      "foldable expression" should "be propagated in order clause" in {
        val logAppender = createLogAppender()
        val combinedUnionQuery = sparkSession.sql("SELECT NOW() AS current_time ORDER BY current_time")
    
        val queryExecutionPlan = combinedUnionQuery.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.FoldablePropagation ===
        // !Sort [current_time#20 ASC NULLS FIRST], true       Sort [1505404900993000 ASC NULLS FIRST], true
        // +- Project [1505404900993000 AS current_time#20]   +- Project [1505404900993000 AS current_time#20]
        // +- OneRowRelation$                                 +- OneRowRelation$
        queryExecutionPlan should include("+- Scan OneRowRelation")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.FoldablePropagation ===")
      }
      
  • InferFiltersFromConstraints - adds supplementary filters created from the constraints defined by the user.
      "filters on property value" should "be enriched with null check filter" in {
        val logAppender = createLogAppender()
        val inferredFiltersQuery = Users.join(UserOrders, Users("id") === UserOrders("user_id"), "inner")
          .filter("login != 'user_1'")
    
        val queryExecutionPlan = inferredFiltersQuery.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
        // Join Inner, (id#5 = user_id#25)                            Join Inner, (id#5 = user_id#25)
        // :- Project [_1#2 AS id#5, _2#3 AS login#6]                 :- Project [_1#2 AS id#5, _2#3 AS login#6]
        // !:  +- Filter NOT (_2#3 = user_1)                           :  +- Filter (isnotnull(_2#3) && NOT (_2#3 = user_1))
        // :     +- LocalRelation [_1#2, _2#3]                        :     +- LocalRelation [_1#2, _2#3]
        // +- Project [_1#22 AS user_id#25, _2#23 AS order_name#26]   +- Project [_1#22 AS user_id#25, _2#23 AS order_name#26]
        // +- LocalRelation [_1#22, _2#23]                            +- LocalRelation [_1#22, _2#23]
        queryExecutionPlan should include(":  +- Filter (isnotnull(_2")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===")
      }
      
  • LikeSimplification - reformats text expressions that don't need to be evaluated against RegEx. Instead they can be directly calculated against String's methods: startsWith, endsWith, contains or equals.
      "filter with LIKE'text%'" should "be simplified with startsWith method" in {
        val logAppender = createLogAppender()
        val likeSimplifiedQuery = Users.select("id", "login").where("login LIKE 'text%'")
    
        val queryExecutionPlan = likeSimplifiedQuery.queryExecution.toString()
    
        // Expected message in the logs:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.LikeSimplification ===
        // Project [_1#2 AS id#5, _2#3 AS login#6]          Project [_1#2 AS id#5, _2#3 AS login#6]
        // !+- Filter (isnotnull(_2#3) && _2#3 LIKE text%)   +- Filter (isnotnull(_2#3) && StartsWith(_2#3, text))
        // +- LocalRelation [_1#2, _2#3]                    +- LocalRelation [_1#2, _2#3]
        queryExecutionPlan should include(" && StartsWith(_2")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.LikeSimplification ===")
      }
    
      "filter with LIKE'%text%'" should "be simplified with contains method" in {
        val logAppender = createLogAppender()
        val likeSimplifiedQuery = Users.select("id", "login").where("login LIKE '%text%'")
    
        val queryExecutionPlan = likeSimplifiedQuery.queryExecution.toString()
    
        // Expected message in logs:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.LikeSimplification ===
        // Project [_1#2 AS id#5, _2#3 AS login#6]           Project [_1#2 AS id#5, _2#3 AS login#6]
        // !+- Filter (isnotnull(_2#3) && _2#3 LIKE %text%)   +- Filter (isnotnull(_2#3) && Contains(_2#3, text))
        // +- LocalRelation [_1#2, _2#3]                     +- LocalRelation [_1#2, _2#3]
        queryExecutionPlan should include(") && Contains(_2")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.LikeSimplification ===")
      }
      
  • LimitPushDown - applies optimized LIMIT operation in the case of UNION and OUTER JOIN. For instance, the query SELECT ... UNION SELECT ... ... LIMIT 10 will be translated to two select queries, each taking 10 items. After getting 20 elements from them, the result will be later limited to the initial limit of 10 (= 10 entries taken from the set of 20).
      "the same limit" should "be applied to 2 unions" in {
        val logAppender = createLogAppender()
        val unionedAndLimitedUsers = Users.union(NewUsers).limit(1)
    
        val queryExecutionPlan = unionedAndLimitedUsers.queryExecution.toString()
        // Expected message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.LimitPushDown ===
        // GlobalLimit 1                                          GlobalLimit 1
        // +- LocalLimit 1                                        +- LocalLimit 1
        // +- Union                                               +- Union
        //  !      :- Project [_1#2 AS id#5, _2#3 AS login#6]             :- LocalLimit 1
        // !      :  +- LocalRelation [_1#2, _2#3]                       :  +- Project [_1#2 AS id#5, _2#3 AS login#6]
        // !      +- Project [_1#12 AS id#15, _2#13 AS login#16]         :     +- LocalRelation [_1#2, _2#3]
        // !         +- LocalRelation [_1#12, _2#13]                     +- LocalLimit 1
        // !                                                                +- Project [_1#12 AS id#15, _2#13 AS login#16]
        // !                                                                   +- LocalRelation [_1#12, _2#13]
        queryExecutionPlan should include("== Optimized Logical Plan ==\nGlobalLimit 1" +
          "\n+- LocalLimit 1" +
          "\n   +- Union" +
          "\n      :- LocalLimit 1")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.LimitPushDown ===")
      }
    
      "global union limit" should "be used instead of local limit" in {
        val logAppender = createLogAppender()
        val unionedAndLimitedUsers = Users.union(NewUsers.limit(3)).limit(1)
    
        val queryExecutionPlan = unionedAndLimitedUsers.queryExecution.toString()
    
        // Expected log message
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.LimitPushDown ===
        // GlobalLimit 1                                                GlobalLimit 1
        // +- LocalLimit 1                                              +- LocalLimit 1
        // +- Union                                                     +- Union
        //  !      :- Project [_1#2 AS id#5, _2#3 AS login#6]                   :- LocalLimit 1
        // !      :  +- LocalRelation [_1#2, _2#3]                             :  +- Project [_1#2 AS id#5, _2#3 AS login#6]
        // !      +- GlobalLimit 3                                             :     +- LocalRelation [_1#2, _2#3]
        // !         +- LocalLimit 3                                           +- LocalLimit 1
        // !            +- Project [_1#12 AS id#15, _2#13 AS login#16]            +- LocalLimit 3
        // !               +- LocalRelation [_1#12, _2#13]                           +- Project [_1#12 AS id#15, _2#13 AS login#16]
        // !                                                                            +- LocalRelation [_1#12, _2#13]
        queryExecutionPlan should include("== Optimized Logical Plan ==" +
          "\nGlobalLimit 1" +
          "\n+- LocalLimit 1" +
          "\n   +- Union" +
          "\n      :- LocalLimit 1")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.LimitPushDown ===")
      }
    
      "not changed union limits" should "be returned after optimization" in {
        val logAppender = createLogAppender()
        val unionedAndLimitedUsers = Users.union(NewUsers.limit(1)).limit(3)
    
        val queryExecutionPlan = unionedAndLimitedUsers.queryExecution.toString()
    
        // Expected log message
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.LimitPushDown ===
        //   GlobalLimit 3                                                GlobalLimit 3
        // +- LocalLimit 3                                              +- LocalLimit 3
        // +- Union                                                     +- Union
        //   !      :- Project [_1#2 AS id#5, _2#3 AS login#6]                   :- LocalLimit 3
        // !      :  +- LocalRelation [_1#2, _2#3]                             :  +- Project [_1#2 AS id#5, _2#3 AS login#6]
        // !      +- GlobalLimit 1                                             :     +- LocalRelation [_1#2, _2#3]
        // !         +- LocalLimit 1                                           +- GlobalLimit 1
        // !            +- Project [_1#12 AS id#15, _2#13 AS login#16]            +- LocalLimit 1
        // !               +- LocalRelation [_1#12, _2#13]                           +- Project [_1#12 AS id#15, _2#13 AS login#16]
        // !                                                                            +- LocalRelation [_1#12, _2#13]
        queryExecutionPlan should include("== Optimized Logical Plan ==" +
          "\nGlobalLimit 3" +
          "\n+- LocalLimit 3" +
          "\n   +- Union" +
          "\n      :- LocalLimit 3" +
          "\n      :  +- LocalRelation ")
        queryExecutionPlan should include("+- GlobalLimit 1" +
          "\n         +- LocalLimit 1" +
          "\n            +- LocalRelation [id#")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.LimitPushDown ===")
      }
      

The post described almost a half of logical plan operator optimizations. We could learn that some of them are propagated to subsequent parts of the query, the others are simply eliminated because of their triviality while remaining are reformatted in order to gain better performances. But it's only a subset. The remaining optimizations will be presented in the second post that will be published next week.

Share, like or comment this post on Twitter:

Share on: