What's new in Apache Spark 3.2.0 - SQL changes

Versions: Apache Spark 3.2.0

Apache Spark SQL evolves and with each new release, it gets closer to the ANSI standard. The 3.2.0 release is not different and you can find many ANSI-related changes. But not only and hopefully, you'll discover all this in this blog post which has an unusual form because this time, I won't focus on the implementation details.

String functions

Let's start simple with the string functions. The first addition is REGEXP_LIKE that performs a RegEx-based matching on a column. It returns true if the value matches the pattern.

The second new function is BTRIM which removes any of the defined characters from the value. Any is a quite important keyword here because the operation works at a character-basis even though the characters are passed as an expression.

And to terminate this part, Apache Spark SQL's DECODE function gets more consistent with other databases. Its previous implementation was only converting a binary value to a string and this still holds in the 3.2.0 release. However, the new version also extended this with the support for case-match scenarios where a string can be replaced by something else if it matches the condition value. If you need some examples, you'll find them just below:

    val output = sparkSession.sql(
      """
        |SELECT text, REGEXP_LIKE(text, "\\d+-\\d+-\\d+") AS is_date_pattern,
        |BTRIM(text, "cf") AS btrimmed_text,
        |DECODE(text, "xyz", "zyx") AS new_decode_reversed_xyz,
        |ENCODE(text, "UTF-8") AS encoded_text,
        |DECODE(ENCODE(text, "UTF-8") , "UTF-8") AS old_decode
        |FROM test_table
        |""".stripMargin).as[QueryResult].collect().groupBy(queryResult => queryResult.text)
      .map {
        case (key, results) => (key, results.head)
      }

    assert(output("2020-01-01").is_date_pattern)
    assert(!output("abc").is_date_pattern)
    assert(output("abc").btrimmed_text == "ab")
    assert(output("cxyzab").btrimmed_text == "xyzab")
    assert(output("xyz").btrimmed_text == "xyz")
    assert(output("cfabccccdefc").btrimmed_text == "abccccde")
    assert(output("xyz").new_decode_reversed_xyz == "zyx")
    assert(output("cfabccccdefc").new_decode_reversed_xyz == null)

case class QueryResult(text: String, is_date_pattern: Boolean, btrimmed_text: String,
                       new_decode_reversed_xyz: String, encoded_text: Array[Byte],
                       old_decode: String)

All the functions were added by Jiaan Geng.

Window

Although WINDOW function is not new, it got an interesting compatibility improvement. Jiaan Geng added a nullability support for LEAD, LAG, NTH_VALUE, FIRST_VALUE and LAST_VALUE functions. Hence, you can use an ignore nulls mode that will skip null rows from the window or respect nulls that will take them into account during the function execution.

    Seq(
      ("Team A", "Player 1", Some(3)),
      ("Team A", "Player 2", None),
      ("Team A", "Player 3", Some(6)),
      ("Team B", "Player 1", None),
      ("Team B", "Player 2", Some(12)),
    ).toDF("team", "player", "points").createTempView("test_table")

    val output = sparkSession.sql(
      """
        |SELECT team, player, points,
        |nth_value(points, 1) IGNORE NULLS OVER team_players_window AS nth_value_1_ignored_nulls,
        |nth_value(points, 1) OVER team_players_window AS nth_value_1_not_ignored_nulls,
        |LAG(points) IGNORE NULLS OVER team_players_window AS lag_ignored_nulls,
        |LAG(points) OVER team_players_window AS lag_not_ignored_nulls
        |FROM test_table
        |WINDOW team_players_window AS (PARTITION BY team ORDER BY player)
        |""".stripMargin).as[WindowQueryResult].collect()
    .groupBy(result => s"${result.team}-${result.player}").map {
      case (groupKey, values) => (groupKey, values.head)
    }

    assert(output("Team A-Player 3").lag_ignored_nulls == Some(3))
    assert(output("Team A-Player 3").lag_not_ignored_nulls == None)
    assert(output("Team B-Player 2").nth_value_1_ignored_nulls == Some(12))
    assert(output("Team B-Player 2").nth_value_1_not_ignored_nulls == None)

case class WindowQueryResult(team: String, player: String, points: Option[Int],
                             nth_value_1_ignored_nulls: Option[Int], nth_value_1_not_ignored_nulls: Option[Int],
                             lag_ignored_nulls: Option[Int], lag_not_ignored_nulls: Option[Int])

Grouping analytics

The next change you may notice while working with Apache Spark 3.2.0 is related to the grouping sets. In a nutshell, what's changed:

Misc

And what about other categories? There is a huge epic dedicated to the support of INTERVAL type led by Max Gekk. The initial intervals implementation lacks 2 properties: persistence (= cannot be defined in a table) and equality (= cannot be compared). To address them, the community works on implementing 2 new ANSI-compatible interval types to represent year-month and day-second intervals. The new release got then the support for these 2 types and also a function to create them:

    val interval = sparkSession.sql(
      """
        |SELECT
        |MAKE_DT_INTERVAL(1, 3) AS datetime_interval_value,
        |MAKE_YM_INTERVAL(2021, 3) AS year_month_interval_value
        |""".stripMargin)
      // Under-the-hood, they can be converted to Duration and Period
      // Check: https://issues.apache.org/jira/browse/SPARK-34615 and https://issues.apache.org/jira/browse/SPARK-34605
      .as[(Duration, Period)].collect().head
    assert(interval._1.toMillis == Instant.parse("1970-01-02T03:00:00Z").toEpochMilli)
    assert(interval._2.getMonths == 3)
    assert(interval._2.getYears == 2021)

Besides this intervals-related work, in the new version you'll find error-handling functions added by Gengliang Wang. The functions will return null values instead of overflown results:

    sparkSession.sql(
      s"""
        |SELECT
        |${Long.MaxValue} + 2 AS long_add_2,
        |TRY_ADD(${Long.MaxValue}, 2) AS long_safe_add_2,
        |CAST(${Long.MaxValue} AS INT) AS long_cast_to_int,
        |TRY_CAST(${Long.MaxValue} AS INT) AS long_safe_cast_to_int,
        |${Long.MaxValue} AS long_max
        |""".stripMargin).as[(Long, Option[Long], Long, Option[Long], Long)].collect().head

    assert(transformedMaxLong == (-9223372036854775807L, None, -1, None, 9223372036854775807L ))

Also, Apache Spark supports the CURRENT_USER() function that returns the user executing the query. By default, it's retrieved from a local thread but if it's missing, the execution engine will take the value from SPARK_USER or currently logged user to the cluster.

And that's all for today. Next weeks I'll continue my exploration of Apache Spark 3.2.0 features and cover other fascinating features, maybe less user-facing than the functions but as much interesting!