What's new in Apache Spark 3.0 - new SQL functions

Versions: Apache Spark 3.0.0

After date time management, it's time to see another important feature of Apache Spark 3.0, he new SQL functions.


That's not really a SQL function but a Hive's one. It's a metadata operation that sets a new location for a given database. If it's not a SQL concept, why am I mentioning it here? When I first saw this function I thought that it can be a good way to implement aliasing strategy for Big Data immutability (learn more in Big Data immutability approaches - aliasing) but I was wrong.

This command only changes the location for the new tables and partitions, the old ones aren't modified at all. If you thought like me, I hope that this explanation will help to keep this in mind.

bit_and, bit_or, bit_xor

BIT_AND, BIT_OR and BIT_XOR are 2 new functions to deal with bitwise integer aggregates. You can see them in action in the following snippet:

  "bitwise functions" should "apply to the column values" in {
      (1), (2), (3)

    val functionsString = sparkSession
             | BIT_AND(number), ",", BIT_OR(number), ",", BIT_XOR(number)
             |) AS bitwise_concat
             |FROM bitwise_test""".stripMargin)
      .map(row => row.getAs[String]("bitwise_concat"))

    functionsString should have size 1
    functionsString(0) shouldEqual ("0,3,0")
From my previous article you learned that Apache Spark 3.0 brings a lot of changes in datetime management. One of their parts are new temporal functions and DATE_PART is one of them. As its name indicates, you can use it to retrieve a field from the timestamp or interval field. In the following snippet I'm using it to extract day and month:

  "date_part" should "extract day from a timestamp column" in {
      (new Timestamp(0L)), (new Timestamp(TimeUnit.DAYS.toMillis(2L)))

    val extractedDays = sparkSession
      .sql("SELECT DATE_PART('day', dt) AS extracted_day FROM date_part_test")
      .map(row => row.getAs[Integer]("extracted_day"))

    extractedDays should have size 2
    extractedDays should contain allOf(1, 3)

Hyperbolic functions

To be honest, I didn't know this name when I saw the functions in the list. After a quick research I learned that the hyperbolic functions are like trigonometric functions but defined for a hyperbola instead of a circle. They can be used then for instance to describe the shape of the curve formed by a high-voltage line suspended between two towers. The following snippet compares the circle and hyperbole versions:

  "hyperbolic functions" should "return different results than not hyperbolic ones" in {

    val functionsString = sparkSession
          | acosh(number), ",", acos(number), ",",
          | asinh(number), ",", asin(number), ",",
          | atanh(number), ",", atan(number)
          |) AS funcs_concat
          |FROM hyperbolic_funcs_test""".stripMargin)
      .map(row => row.getAs[String]("funcs_concat"))

    functionsString should have size 1
    functionsString(0) shouldEqual "0.7564329108569596,NaN,1.078451058954897,NaN,NaN,0.9151007005533605"


As the name indicates, you can create a date-typed field from a year, month and day:

  "make_date" should "create a date from 3 separated fields" in {
      (2020, 4, 1),
      (2019, 1, 20),
      (2020, 2, 31) // error on purpose, let's see what happens
    ).toDF("y", "m", "d").createTempView("make_date_test")

    val mappedDates = sparkSession
      .sql("SELECT CAST(make_date(y, m, d) AS STRING) AS date_from_ymd FROM make_date_test")
      .map(row => row.getAs[String]("date_from_ymd"))

    mappedDates should have size 3
    mappedDates should contain allOf(null, "2020-04-01", "2019-01-20")


Apart from the date, you can also create a timestamp-typed field:

  "make_timestamp" should "create a timestamp from 4 separated fields" in {
      (2020, 4, 1, 10, 20),
      (2019, 1, 20, 15, 59),
      (2020, 2, 2, 25, 10) // error on purpose, let's see what happens
    ).toDF("y", "m", "d", "h", "min").createTempView("make_timestamp_test")

    val mappedTs = sparkSession
      .sql("SELECT CAST(make_timestamp(y, m, d, h, min, 0, 'UTC') AS STRING) " +
        "AS ts_from_ymd FROM make_timestamp_test")
      .map(row => row.getAs[String]("ts_from_ymd"))

    mappedTs should have size 3
    mappedTs should contain allOf(null, "2020-04-01 12:20:00", "2019-01-20 16:59:00")

max_by, min_by

These 2 functions are quite interesting since they came to the 3.0 release from Presto. The MAX_BY(field1, field2) returns the field1 associated with the maximum value of field2 over all input values. The MIN_BY does the opposite. You can see that in the following test cases:

  "min_by and max_by" should "retrieve a column for min and max values" in {
      (2020, 4, 1, "year 2020"),
      (2019, 1, 20, "year 2019"),
      (2018, 2, 2, "year 2018")
    ).toDF("y", "m", "d", "label").createTempView("min_by_max_by_test")

    val minAndMaxLabels = sparkSession
      .sql("SELECT min_by(label, y) AS minLabel, max_by(label, y) AS maxLabel FROM min_by_max_by_test")
      .map(row => (row.getAs[String]("minLabel"), row.getAs[String]("maxLabel")))

    minAndMaxLabels should have size 1
    minAndMaxLabels(0) shouldEqual ("year 2018", "year 2020")


The next added function, OVERLAY, replaces a substring by another one:

  "overlay" should "replace the first 3 letters by xxx" in {

    val minAndMaxLabels = sparkSession
      .sql("SELECT OVERLAY(label PLACING 'xxx' FROM 1) AS formattedLabel FROM overlay_test")
      .map(row => row.getAs[String]("formattedLabel"))

    minAndMaxLabels should have size 4
    // As you can see, OVERLAY adds new characters for the columns shorter than 3 characters
    minAndMaxLabels should contain allElementsOf  (Seq("xxxdef", "xxx", "xxx", "xxx"))

New aliases

Apart from really new functions, Apache Spark 3.0 also brings few new aliases for already existing functions. Thanks to them you can use BOOL_AND instead of EVERY, BOOL_OR, SOME instead of ANY and RANDOM instead of RAND.:

  "new aliases" should "do the same thing like existing methods" in {
      (true, false),
      (true, true),
      (true, false)
    ).toDF("bool1", "bool2").createTempView("aliases_test")

    val boolAliases = sparkSession
          |BOOL_AND(bool1) AS bool_and_version, EVERY(bool1) AS every_version,
          |BOOL_OR(bool2) AS bool_or_version, ANY(bool2) AS any_version
          |FROM aliases_test""".stripMargin)
      .map(row =>
        (row.getAs[Boolean]("bool_and_version"), row.getAs[Boolean]("every_version"),
          row.getAs[Boolean]("bool_or_version"), row.getAs[Boolean]("any_version")))

    boolAliases should have size 1
    boolAliases(0) shouldEqual (true, true, true, true)

  "new aliases for random" should "generate different results every time" in {
      (true, false),
      (true, true),
      (true, false)
    ).toDF("bool1", "bool2").createTempView("random_aliases_test")

    val randomAliases = sparkSession
          |SELECT RANDOM() AS random_v, RAND() AS rand_v
          |FROM random_aliases_test""".stripMargin)
      .map(row =>
        (row.getAs[Double]("random_v"), row.getAs[Double]("rand_v")))

    randomAliases should have size 3

Update February 5, 2021: I missed this change and thank you, German, very much for sharing it with me! Starting from Spark 3 you can also set a custom comparator function to array_sort.

Reverted changes

When I was writing this blog post, I was so happy to discover new functions that I almost forgot that some of them were reverted. The first of them, MAKE_INTERVAL was initially added but finally it was removed because it's too PostgreSQL-specific. Another example, JUSTIFY_DAYS, JUSTIFY_HOURS and JUSTIFY_INTERVAL, were removed after merging them to master for the same reason.

Maybe the functions added in this release are less revolutionary than higher-order functions introduced in 2.4, but this appearance can be misleading. After all, they bring a possibility to cover new use cases, especially if you need to migrate your processing code from a standard SQL-based solution to the more distributed environment on top of Apache Spark SQL.