Spark SQL operator optimizations - part 2

on waitingforcode.com

Spark SQL operator optimizations - part 2

It's time to continue the exploration of operator optimizations of logic plans in Spark SQL. After the first part describing optimizations from A to L, this post covers remaining letters.

The context is still the same. The main code and the post's format is the same as in the post about Spark SQL operator optimizations - part 1.

  • NullPropagation - some of expressions are replaced by the literals that can be statically evaluated.
      "nullable expression" should "be transformed to a literal" in {
        val logAppender = createLogAppender()
        val constantsFoldedQuery = sparkSession.sql("SELECT 'a' != null")
    
        val queryExecutionPlan = constantsFoldedQuery.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.NullPropagation ===
        // !Project [NOT (a = null) AS (NOT (a = CAST(NULL AS STRING)))#20]   Project [null AS (NOT (a = CAST(NULL AS STRING)))#20]
        // +- OneRowRelation$                                                +- OneRowRelation$
        queryExecutionPlan should include("== Optimized Logical Plan ==\nProject [null AS (NOT (a = CAST(NULL AS STRING)))")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.NullPropagation ===")
      }
      
  • PruneFilters - removes trivial filters. "Trivial filter" means here a filter that always evaluates to the same result: true, false or null. For the first case, the filter is removed. For two other situations, the filter is replaced by empty (= no data inside) org.apache.spark.sql.catalyst.plans.logical.LocalRelation object.
      "trivial filter" should "be removed" in {
        val logAppender = createLogAppender()
        val trivialFiltersQuery = Users.select("id", "login").where("2 > 1")
    
        val queryExecutionPlan = trivialFiltersQuery.queryExecution.toString()
    
        // The logs should contain the proof of using PruneFilters optimization rule:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.PruneFilters ===
        //   Project [_1#2 AS id#5, _2#3 AS login#6]   Project [_1#2 AS id#5, _2#3 AS login#6]
        // !+- Filter true                            +- LocalRelation [_1#2, _2#3]
        // !   +- LocalRelation [_1#2, _2#3]
        // (org.apache.spark.sql.internal.BaseSessionStateBuilder$$anon$2:62)
        queryExecutionPlan should include("== Optimized Logical Plan ==\nLocalRelation [id")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.PruneFilters ===")
      }
      
  • PushDownPredicate - pushes the filter to the data source level. This optimizations was described more deeply in the post about Predicate pushdown in Spark SQL
  • PushPredicateThroughJoin - in the case of JOINs, when a filtering expression can be evaluated against one side of joined parts, the filter is pushed directly to this side. It means that filtering is applied before joining the datasets.
      "filters on one side o join" should "be pushed to this side and executed before the join operation" in {
        val logAppender = createLogAppender()
        val pushedPredicateJoinQuery = Users.join(UserOrders, Users("id") === UserOrders("user_id"), "inner")
          .filter("order_name != 'removed'")
    
        val queryExecutionPlan = pushedPredicateJoinQuery.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin ===
        // !Filter NOT (order_name#26 = removed)                          Join Inner, (id#5 = user_id#25)
        // !+- Join Inner, (id#5 = user_id#25)                            :- Project [_1#2 AS id#5, _2#3 AS login#6]
        // !   :- Project [_1#2 AS id#5, _2#3 AS login#6]                 :  +- LocalRelation [_1#2, _2#3]
        // !   :  +- LocalRelation [_1#2, _2#3]                           +- Filter NOT (order_name#26 = removed)
        // +- Project [_1#22 AS user_id#25, _2#23 AS order_name#26]      +- Project [_1#22 AS user_id#25, _2#23 AS order_name#26]
        // +- LocalRelation [_1#22, _2#23]                               +- LocalRelation [_1#22, _2#23]
        queryExecutionPlan should include("   +- Filter (isnotnull(_2")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin ===")
      }
      
  • PushProjectionThroughUnion - pushes selected columns (projection) to tables used in UNION. However, it doesn't apply for UNION DISTINCT queries.
      "projection" should "be pushed down to both tables from UNION" in {
        val logAppender = createLogAppender()
        val queryWithPushedProjectionToUnions = Users.union(NewUsers).select("id", "login")
    
        val queryExecutionPlan = queryWithPushedProjectionToUnions.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.PushProjectionThroughUnion ===
        // !Project [id#5, login#6]                             Union
        // !+- Union                                            :- Project [id#5, login#6]
        // !   :- Project [_1#2 AS id#5, _2#3 AS login#6]       :  +- Project [_1#2 AS id#5, _2#3 AS login#6]
        // !   :  +- LocalRelation [_1#2, _2#3]                 :     +- LocalRelation [_1#2, _2#3]
        // !   +- Project [_1#12 AS id#15, _2#13 AS login#16]   +- Project [id#15, login#16]
        // !      +- LocalRelation [_1#12, _2#13]                  +- Project [_1#12 AS id#15, _2#13 AS login#16]
        // !                                                          +- LocalRelation [_1#12, _2#13]
        queryExecutionPlan should include("== Optimized Logical Plan ==\nUnion\n:- LocalRelation [id")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.PushProjectionThroughUnion ===")
      }
      
  • RemoveDispensableExpressions - simply removes optional nodes from the logical plan. For instance, the positive(...) method acts as an identity function since it always returns the unchanged parameter. Spark SQL will remove it from the optimized plan.
      "redundant positive node" should "be removed from the optimized plan" in {
        val logAppender = createLogAppender()
        Users.createTempView("Users_test_positive")
        val queryWithoutDispensableExpressions = Users.sqlContext.sql("SELECT positive(id) FROM Users_test_positive")
    
        val queryExecutionPlan = queryWithoutDispensableExpressions.queryExecution.toString()
        println(s"${queryExecutionPlan }")
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.RemoveDispensableExpressions ===
        // !Project [positive(_1#2) AS (+ id)#41]   Project [_1#2 AS (+ id)#41]
        // +- LocalRelation [_1#2, _2#3]           +- LocalRelation [_1#2, _2#3]
        queryExecutionPlan should include("== Optimized Logical Plan ==\nLocalRelation [(+ id)")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.RemoveDispensableExpressions ===")
      }
      
  • RemoveRedundantAliases - removes aliases or operations that don't change the name or the metadata of a column.
      "duplicated mapping not changing the object" should "be considered as redundant alias" in {
        val logAppender = createLogAppender()
        val redundantAliasesQuery = Users.select("login").map(_.toString).map(_.toString)
    
        val queryExecutionPlan = redundantAliasesQuery.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.RemoveRedundantAliases ===
        // SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#30]   SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#30]
        // +- MapElements , class java.lang.String, [StructField(value,StringType,true)], obj#29: java.lang.String                                               +- MapElements , class java.lang.String, [StructField(value,StringType,true)], obj#29: java.lang.String
        //   !   +- Project [obj#24 AS obj#28]                                                                                                                                    +- Project [obj#24]
        //   +- MapElements , interface org.apache.spark.sql.Row, [StructField(login,StringType,true)], obj#24: java.lang.String                                   +- MapElements , interface org.apache.spark.sql.Row, [StructField(login,StringType,true)], obj#24: java.lang.String
        //     +- DeserializeToObject createexternalrow(login#6.toString, StructField(login,StringType,true)), obj#23: org.apache.spark.sql.Row                                 +- DeserializeToObject createexternalrow(login#6.toString, StructField(login,StringType,true)), obj#23: org.apache.spark.sql.Row
        //     +- Project [_2#3 AS login#6]                                                                                                                                     +- Project [_2#3 AS login#6]
        //     +- LocalRelation [_1#2, _2#3]
        // We can observe that the optimized plan doesn't create an intermediate (redundant) alias: obj#28 in the 3rd
        // operation from the top.                                                                                                                                  +- LocalRelation [_1#2, _2#3]
        queryExecutionPlan should include("== Optimized Logical Plan ==\nSerializeFromObject [staticinvoke")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.RemoveRedundantAliases ===")
      }
      
  • RemoveRedundantProject - drops select (= projection) statements that don't modify the selected columns (e.g. are defined twice). The check consists on comparison between parent and child outputs:
      case p @ Project(_, child) if p.output == child.output => child
      
    And the learning test:
      "redundant projection" should "be removed from the optimized plan" in {
        val logAppender = createLogAppender()
        val queryWithRemovedRedundantProjection = Users.join(UserOrders, Users("id") === UserOrders("user_id"))
          .select("id", "login", "user_id", "order_name")
          .where("id = 0")
          .select("id", "login", "user_id", "order_name")
    
        val queryExecutionPlan = queryWithRemovedRedundantProjection.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.RemoveRedundantProject ===
        // !Project [id#5, login#6, user_id#25, order_name#26]               Filter ((user_id#25 = 0) && (id#5 = 0))
        // !+- Filter ((user_id#25 = 0) && (id#5 = 0))                       +- Join Inner, (id#5 = user_id#25)
        // !   +- Join Inner, (id#5 = user_id#25)                               :- Project [_1#2 AS id#5, _2#3 AS login#6]
        // !      :- Project [_1#2 AS id#5, _2#3 AS login#6]                    :  +- LocalRelation [_1#2, _2#3]
        // !      :  +- LocalRelation [_1#2, _2#3]                              +- Project [_1#22 AS user_id#25, _2#23 AS order_name#26]
        // !      +- Project [_1#22 AS user_id#25, _2#23 AS order_name#26]         +- LocalRelation [_1#22, _2#23]
        // !         +- LocalRelation [_1#22, _2#23]
        queryExecutionPlan should include("== Optimized Logical Plan ==\nJoin Inner,")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.RemoveRedundantProject ===")
      }
      
  • ReorderAssociativeOperator - all associative integer operations (addition, multiplication) are reordered and computed only once and used in later operations as a constant.
      "additions" should "be reordered to a constant" in {
        val logAppender = createLogAppender()
        Users.createTempView("Users_ReorderedAssocOps")
        val queryReorderedAssociativeOperations = sparkSession.sql("select id+1+3+5+7+9 FROM Users_ReorderedAssocOps")
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.ReorderAssociativeOperator ===
        // !Project [(((((_1#2 + 1) + 3) + 5) + 7) + 9) AS (((((id + 1) + 3) + 5) + 7) + 9)#42]   Project [(_1#2 + 25) AS (((((id + 1) + 3) + 5) + 7) + 9)#42]
        // +- LocalRelation [_1#2, _2#3]                                                         +- LocalRelation [_1#2, _2#3]
        val queryExecutionPlan = queryReorderedAssociativeOperations.queryExecution.toString()
        queryExecutionPlan should include("== Optimized Logical Plan ==\nLocalRelation [(((((id + 1) + 3) + 5) + 7) + 9)")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.ReorderAssociativeOperator ===")
      }
      
  • ReorderJoin - creates a JOIN directly from unordered JOIN (e.g. SELECT...FROM different tables joined on WHERE clause or JOINs without joined column defined). It pushes the join conditions directly into ON clause.
      "plain select query on 3 tables" should "be changed to 2 joins with ON clauses corresponding to WHERE clauses from the initial query" in {
        val logAppender = createLogAppender()
        Users.createTempView("Users_ReorderJoin1")
        NewUsers.createTempView("Users_ReorderJoin2")
        NewUsers.createTempView("Users_ReorderJoin3")
        val reorderedJoinQuery = sparkSession.sql("SELECT u1.id FROM Users_ReorderJoin1 u1, Users_ReorderJoin2 u2, Users_ReorderJoin3 u3 " +
          "WHERE u1.id = u2.id AND u2.login = u3.login")
    
        val queryExecutionPlan = reorderedJoinQuery.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.ReorderJoin ===
        // Project [id#5]                                            Project [id#5]
        // !+- Filter ((id#5 = id#15) && (login#16 = login#44))       +- Join Inner, (login#16 = login#44)
        // !   +- Join Inner                                             :- Join Inner, (id#5 = id#15)
        // !      :- Join Inner                                          :  :- Project [_1#2 AS id#5, _2#3 AS login#6]
        // !      :  :- Project [_1#2 AS id#5, _2#3 AS login#6]          :  :  +- LocalRelation [_1#2, _2#3]
        // !      :  :  +- LocalRelation [_1#2, _2#3]                    :  +- Project [_1#12 AS id#15, _2#13 AS login#16]
        // !      :  +- Project [_1#12 AS id#15, _2#13 AS login#16]      :     +- LocalRelation [_1#12, _2#13]
        // !      :     +- LocalRelation [_1#12, _2#13]                  +- Project [_1#12 AS id#43, _2#13 AS login#44]
        // !      +- Project [_1#12 AS id#43, _2#13 AS login#44]            +- LocalRelation [_1#12, _2#13]
        // !         +- LocalRelation [_1#12, _2#13]
        // As you can see above, the optimized plan executes filtering directly as "JOIN ON..." condition.
        queryExecutionPlan should include("== Optimized Logical Plan ==\nProject")
        queryExecutionPlan should include("+- Join Inner, (login")
        queryExecutionPlan should include("  :  +- Join Inner, (id")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.ReorderJoin ===")
      }
    
      "joins without defined join column" should "be reordered to inner joins" in {
        val logAppender = createLogAppender()
        val reorderedJoinQuery =
          Users.as("u1").join(NewUsers.as("u2")).join(Users.as("u3")).where("u1.id = u2.id AND u1.login = u3.login")
    
        val queryExecutionPlan = reorderedJoinQuery.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.ReorderJoin ===
        // !Filter ((id#5 = id#15) && (login#6 = login#49))        Join Inner, (login#6 = login#49)
        // !+- Join Inner                                          :- Join Inner, (id#5 = id#15)
        // !   :- Join Inner                                       :  :- Project [_1#2 AS id#5, _2#3 AS login#6]
        // !   :  :- Project [_1#2 AS id#5, _2#3 AS login#6]       :  :  +- LocalRelation [_1#2, _2#3]
        // !   :  :  +- LocalRelation [_1#2, _2#3]                 :  +- Project [_1#12 AS id#15, _2#13 AS login#16]
        // !   :  +- Project [_1#12 AS id#15, _2#13 AS login#16]   :     +- LocalRelation [_1#12, _2#13]
        // !   :     +- LocalRelation [_1#12, _2#13]               +- Project [_1#2 AS id#48, _2#3 AS login#49]
        // !   +- Project [_1#2 AS id#48, _2#3 AS login#49]           +- LocalRelation [_1#2, _2#3]
        // !      +- LocalRelation [_1#2, _2#3]
        queryExecutionPlan should include("== Optimized Logical Plan ==\nJoin Inner, (login#")
        queryExecutionPlan should include(":- Join Inner, (id")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.ReorderJoin ===")
      }
      
  • RewriteCorrelatedScalarSubquery - the secret of this optimization understanding is hidden behind the correlated scalar subquery. Scalar and subquery aren't a mystery but together with correlated can pose a problem. In fact the correlated scalar subquery is a subquery returning 1 element (scalar) that is in some way related to the query from upper level. This correlation can be explained by using 2 columns, one from the upper query and one from the subquery, together in subquery's WHERE clause. Spark's job in this kind of queries consists on rewriting these subqueries to LEFT OUTER JOINs
      "correlated subquery" should "be rewritten in LEFT OUTER JOIN" in {
        val logAppender = createLogAppender()
        Users.createTempView("Users")
        val correlatedSubqueryQuery =
          Users.sqlContext.sql("SELECT login, (SELECT max(id) FROM Users WHERE U.login=login) AS max_id FROM Users U")
    
        val queryExecutionPlan = correlatedSubqueryQuery.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.RewriteCorrelatedScalarSubquery ===
        // !Project [_2#3 AS login#6, scalar-subquery#31 [(_2#3 = login#6#38)] AS max_id#32]   Project [_2#3 AS login#6, max(id)#34 AS max_id#32]
        // !:  +- Aggregate [login#6], [max(id#5) AS max(id)#34, login#6 AS login#6#38]        +- Project [_1#2, _2#3, max(id)#34]
        // !:     +- LocalRelation [id#5, login#6]                                                +- Join LeftOuter, (_2#3 = login#6#38)
        // !+- LocalRelation [_1#2, _2#3]                                                            :- LocalRelation [_1#2, _2#3]
        // !                                                                                         +- Aggregate [login#6], [max(id#5) AS max(id)#34, login#6 AS login#6#38]
        // !                                                                                            +- LocalRelation [id#5, login#6]
        queryExecutionPlan should include("== Optimized Logical Plan ==\nProject ")
        queryExecutionPlan should include("+- Join LeftOuter, ")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.RewriteCorrelatedScalarSubquery ===")
      }
      
  • SimplifyBinaryComparison - rewrites a binary comparison by replacing: '<=>' with true, '=', '<=', and '>=' with 'true' or <' and '>' with 'false' but only when both operands are non-nullable.
      "static binary expression on id equality" should "be converted to appropriate literal" in {
        val logAppender = createLogAppender()
        val simplifiedBinaryExpressionQuery = Users.select(Users("id") === Users("id"))
    
        val queryExecutionPlan = simplifiedBinaryExpressionQuery.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.SimplifyBinaryComparison ===
        // !Project [(_1#2 <=> _1#2) AS (id <=> id)#30]   Project [true AS (id <=> id)#30]
        // +- LocalRelation [_1#2, _2#3]                 +- LocalRelation [_1#2, _2#3]
        queryExecutionPlan should include("== Analyzed Logical Plan ==\n(id = id): boolean\nProject ")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.SimplifyBinaryComparison ===")
      }
    
      "static binary expression on id" should "be converted to true literal" in {
        val logAppender = createLogAppender()
        val simplifiedBinaryExpressionQuery = Users.select(Users("id") <=> Users("id"))
    
        val queryExecutionPlan = simplifiedBinaryExpressionQuery.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.SimplifyBinaryComparison ===
        // !Project [(_1#2 <=> _1#2) AS (id <=> id)#30]   Project [true AS (id <=> id)#30]
        // +- LocalRelation [_1#2, _2#3]                 +- LocalRelation [_1#2, _2#3]
        queryExecutionPlan should include("== Analyzed Logical Plan ==\n(id <=> id): boolean\nProject ")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.SimplifyBinaryComparison ===")
      }
      
  • SimplifyCasts - removes redundant casts. They occur when we try to cast a column to the same type, e.g. a String column to String.
      "redundant cast" should "be detected when string column is casted to string type" in {
        val logAppender = createLogAppender()
        val castedUsers = Users.select(Users("login").cast("string"))
    
        val queryExecutionPlan = castedUsers.queryExecution.toString()
    
        // In logs we're expecting to find the following entry:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.SimplifyCasts ===
        // !Project [cast(_2#3 as string) AS login#10]   Project [_2#3 AS login#10]
        // +- LocalRelation [_1#2, _2#3]                +- LocalRelation [_1#2, _2#3]
        // (org.apache.spark.sql.internal.BaseSessionStateBuilder$$anon$2:62)
        queryExecutionPlan should include("== Optimized Logical Plan ==\nLocalRelation [login")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.SimplifyCasts ===")
      }
      
  • SimplifyCaseConversionExpressions - removes the nested case conversion expressions that are always overwritten by the outer expressions. For instance, the expression LOWER_CASE(UPPER_CASE(column1)) can be replaced by LOWER_CASE(column1) because the UPPER_CASE is always overwritten by LOWER_CASE.
       "nested case conversion" should "be simplified to only 1 conversion" in {
        val logAppender = createLogAppender()
        val upperCasedLogins = Users.select(upper(lower(Users("login"))).as("upper_cased_login"))
    
        val queryExecutionPlan = upperCasedLogins.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.SimplifyCaseConversionExpressions ===
        // !Project [upper(lower(_2#3)) AS upper_cased_login#20]   Project [upper(_2#3) AS upper_cased_login#20]
        // +- LocalRelation [_1#2, _2#3]                          +- LocalRelation [_1#2, _2#3]
        queryExecutionPlan should include("== Optimized Logical Plan ==\nLocalRelation [upper_cased_login")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.SimplifyCaseConversionExpressions ===")
      }
      
  • SimplifyConditionals - tends to simplify if/case conditional expressions. The simplification can be translated by: removing case/when if the first branch is always true, removing always false branches in case/when expression or transforming always true/false if expressions to literal expressions (e.g. IF 10 < 20 => 10).
      "the always false expression" should "be simplified" in {
        val logAppender = createLogAppender()
        Users.createTempView("Users_SimplifyConditionals")
        val simplifiedConditionalsQuery = sparkSession.sql("SELECT CASE WHEN 10 > 20 THEN \"10_20\" ELSE \"20_10\" END " +
          "FROM Users_SimplifyConditionals")
    
        val queryExecutionPlan = simplifiedConditionalsQuery.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.SimplifyConditionals ===
        // !Project [CASE WHEN false THEN 10_20 ELSE 20_10 END AS CASE WHEN (10 > 20) THEN 10_20 ELSE 20_10 END#41]   Project [20_10 AS CASE WHEN (10 > 20) THEN 10_20 ELSE 20_10 END#41]
        // +- LocalRelation [_1#2, _2#3]                                                                             +- LocalRelation [_1#2, _2#3]
        queryExecutionPlan should include("== Analyzed Logical Plan ==\nCASE WHEN (10 > 20) THEN 10_20 ELSE 20_10 END: string" +
          "\nProject [CASE WHEN (10 > 20) THEN 10_20 ELSE 20_10 END AS CASE WHEN (10 > 20) THEN 10_20 ELSE 20_10 END")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.SimplifyConditionals ===")
      }
    
      "not changing if-else expression" should "be simplified to a constant" in {
        val logAppender = createLogAppender()
        Users.createTempView("Users_SimplifyConditionals2")
        val simplifiedConditionalsQuery = sparkSession.sql("SELECT IF(10 > 20, \"A\", \"B\") " +
          "FROM Users_SimplifyConditionals2")
    
        val queryExecutionPlan = simplifiedConditionalsQuery.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.SimplifyConditionals ===
        // !Project [if (false) A else B AS (IF((10 > 20), A, B))#41]   Project [B AS (IF((10 > 20), A, B))#41]
        // +- LocalRelation [_1#2, _2#3]                               +- LocalRelation [_1#2, _2#3]
        queryExecutionPlan should include("== Optimized Logical Plan ==\nLocalRelation [(IF((10 > 20), A, B))")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.SimplifyConditionals ===")
      }
    
      "variable if-else expression" should "not be simplified" in {
        val logAppender = createLogAppender()
        Users.createTempView("Users_SimplifyConditionals3")
        val simplifiedConditionalsQuery = sparkSession.sql("SELECT IF(id > 20, \"id>20\", \"id<=20\") " +
          "FROM Users_SimplifyConditionals3")
    
        val queryExecutionPlan = simplifiedConditionalsQuery.queryExecution.toString()
    
        queryExecutionPlan should include("== Optimized Logical Plan ==\nLocalRelation [(IF((id > 20), id>20, id<=20))")
        logAppender.getMessagesText().mkString("\n") should not include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.Users_SimplifyConditionals3 ===")
      }
      
  • SimplifyCreateArrayOps - an array can be created through an array(...) function. But sometimes, maybe by mistake, only 1 item is used from the array created so. In this situation Spark tries to optimize the code and creates only the needed item. The same rule applies for array of structures when only 1 field is selected.
      "array optimizer" should "be used when only 1 entry is used from created array" in {
        val logAppender = createLogAppender()
        AllOrdersPerUser.createTempView("All_Orders_test1")
        val arrayStructOptimizedQuery = AllOrdersPerUser.sqlContext.sql(
          "SELECT array(1*orders_amounts[0], 2*orders_amounts[1])[1] AS the_last_weighted_order FROM All_Orders_test1")
    
        val queryExecutionPlan = arrayStructOptimizedQuery.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.SimplifyCreateArrayOps ===
        // !Project [array((1 * _2#33[0]), (2 * _2#33[1]))[1] AS the_last_weighted_order#41]   Project [(2 * _2#33[1]) AS the_last_weighted_order#41]
        // +- LocalRelation [_1#32, _2#33]                                                    +- LocalRelation [_1#32, _2#33]
        queryExecutionPlan should include("== Optimized Logical Plan ==\nLocalRelation [the_last_weighted_order")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.SimplifyCreateArrayOps ===")
      }
    
      "array optimizer" should "be used when only the array field is used from created structure" in {
        val logAppender = createLogAppender()
        AllOrdersPerUser.createTempView("All_Orders_test2")
        val arrayStructOptimizedQuery = AllOrdersPerUser.sqlContext.sql(
          "SELECT array(named_struct('user', user_id), named_struct('user', 1000))[0].user AS first_user_id FROM All_Orders_test2")
    
        val queryExecutionPlan = arrayStructOptimizedQuery.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.SimplifyCreateArrayOps ===
        // !Project [array(named_struct(user, _1#32), [1000])[0].user AS first_user_id#41]   Project [named_struct(user, _1#32).user AS first_user_id#41]
        // +- LocalRelation [_1#32, _2#33]                                        +- LocalRelation [_1#32, _2#33]
        queryExecutionPlan should include("== Optimized Logical Plan ==\nLocalRelation [first_user_id")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.SimplifyCreateArrayOps ===")
      }
      
  • SimplifyCreateMapOps - same as SimplifyCreateArrayOps but applies to map except it doesn't apply on structures
      "map optimizer" should "be used when only one key is used from created map" in {
        val logAppender = createLogAppender()
        AllOrdersPerUser.createTempView("All_Orders_test4")
        val mapOptimizedStructQuery = AllOrdersPerUser.sqlContext.sql(
          "SELECT map('user', user_id, 'first_order_amount', orders_amounts[0]).first_order_amount AS first_order_amount " +
            "FROM All_Orders_test4")
    
        val queryExecutionPlan = mapOptimizedStructQuery.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.SimplifyCreateMapOps ===
        // !Project [map(user, _1#32, first_order_amount, _2#33[0])[first_order_amount] AS first_order_amount#41]   Project [CASE WHEN (first_order_amount = user) THEN _1#32 WHEN (
        //   first_order_amount = first_order_amount) THEN _2#33[0] END AS first_order_amount#41]
        // +- LocalRelation [_1#32, _2#33]                                                                         +- LocalRelation [_1#32, _2#33]
        queryExecutionPlan should include("== Optimized Logical Plan ==\nLocalRelation [first_order_amount")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.SimplifyCreateMapOps ===")
      }
      
  • SimplifyCreateStructOps - similarly to 2 previous SimplifyCreate* optimizations except that this one applies to a structure.
      "structure creation" should "be optimized when only one field is used" in {
        val logAppender = createLogAppender()
        AllOrdersPerUser.createTempView("All_Orders_test3")
        val structOptimizedQuery = AllOrdersPerUser.sqlContext.sql(
          "SELECT named_struct('user', user_id, 'orders', array(1, 2*orders_amounts[1])).orders AS all_orders FROM All_Orders_test3")
    
        val queryExecutionPlan = structOptimizedQuery.queryExecution.toString()
    
        // Expected log message:
        // === Applying Rule org.apache.spark.sql.catalyst.optimizer.SimplifyCreateStructOps ===
        // !Project [named_struct(user, _1#32, orders, array(1, (2 * _2#33[1]))).orders AS all_orders#41]   Project [array(1, (2 * _2#33[1])) AS all_orders#41]
        // +- LocalRelation [_1#32, _2#33]                                                                 +- LocalRelation [_1#32, _2#33]
        queryExecutionPlan should include("== Optimized Logical Plan ==\nLocalRelation [all_orders")
        logAppender.getMessagesText().mkString("\n") should include("=== Applying Rule " +
          "org.apache.spark.sql.catalyst.optimizer.SimplifyCreateStructOps ===")
      }
      

This second part shown that some of operations are simplified or transformed to other ones in order to improve execution performance. Some of them also helps to avoid human mistakes as the array, map and structure creators.

Comments:

There are no comments for this article.

Write a comment