What's new in Apache Spark 3.1.1 - new built-in functions

Versions: Apache Spark 3.1.1

Every Apache Spark release brings not only completely new components but also new native functions. The 3.1.1 is not an exception and it also comes with some new built-in functions!

JSON functions

To start, we've 2 new functions operating on JSON documents. The first of them returns the number of elements in an array. The function is called json_array_length. You can use it alongside the JSON transformation functions to check the properties of the document. The second of the functions is json_object_keys. You can use it to extract the keys of the document. Once again, if mixed with a transformation function, you can analyze the content of nested fields:

    Seq(
      ("""{"number1": 1, "number2": {"number3": 3}, "number4": 4, "number5": [5, 5, 5]}"""),
      ("""{"number1": 1, "number2": {"number3": 3}, "number4": 4, "number5": [5, 5, 5, 5, 5, 5], "number6": 6}""")
    ).toDF("json_string").createTempView("json_fields_table")

    val response = sparkSession.sql(
      """
        |SELECT
        | JSON_OBJECT_KEYS(json_string) AS outer_fields,
        | JSON_OBJECT_KEYS(GET_JSON_OBJECT(json_string, "$.number2")) AS nested_keys,
        | JSON_ARRAY_LENGTH(GET_JSON_OBJECT(json_string, "$.number5")) AS number5_length
        |FROM json_fields_table
        |""".stripMargin).as[(Seq[String], Seq[String], Int)].collect()

    response should have length 2
    response should contain allOf(
      (Seq("number1", "number2", "number4", "number5"), Seq("number3"), 3),
      (Seq("number1", "number2", "number4", "number5", "number6"), Seq("number3"), 6)
    )

Both functions were implemented by Rakesh Raushan.

Datetime functions

Much more things happened for datetime functions. To start, a new set of functions were added by philipse, TJX2014 and Max Gekk to convert integer values representing seconds, milliseconds or microseconds, to UNIX timestamps:

    val timestampsFromInts = sparkSession.sql(
      """
        |SELECT
        | TIMESTAMP_SECONDS(1) AS timestamp_seconds_value,
        | TIMESTAMP_MILLIS(1000) AS timestamp_millis_value,
        | TIMESTAMP_MICROS(1000000) AS timestamp_micros_value
        |""".stripMargin).as[(String, String, String)].collect()

    timestampsFromInts should have size 1
    timestampsFromInts(0) shouldEqual ("1970-01-01 01:00:01", "1970-01-01 01:00:01", "1970-01-01 01:00:01")

In addition to them, you can do the opposite: convert a timestamp into seconds, milliseconds, or microseconds. Gengliang Wang implemented the feature and you can find its example just below:

    Seq(
      (Timestamp.valueOf("1970-01-01 01:00:01.000000000"))
    ).toDF("timestamp_column").createTempView("datetime_functions_table")

    val integersFromTimestamps = sparkSession.sql(
      """
        |SELECT
        | UNIX_SECONDS(timestamp_column) AS unix_seconds_value,
        | UNIX_MILLIS(timestamp_column) AS unix_milliseconds_value,
        | UNIX_MICROS(timestamp_column) AS unix_microseconds_value
        |FROM datetime_functions_table
        |""".stripMargin).as[(Long, Long, Long)].collect()

    integersFromTimestamps should have size 1
    integersFromTimestamps(0) shouldEqual (1L, 1000L, 1000000L)

Gengliang Wang is also the author of 2 other functions, which support conversion between dates and integers. Remember, 01/01/1970 corresponds to 0 days, meaning you will always generate the date for one day later than the integer from the parameter. An example just below where the "30" generates the 31/01/1970:

    val dateConversionResults = sparkSession.sql(
      """
        |SELECT
        | DATE_FROM_UNIX_DATE(0) AS date_from_unix_date_day_0,
        | DATE_FROM_UNIX_DATE(30) AS date_from_unix_date_day_30,
        | UNIX_DATE(DATE_FROM_UNIX_DATE(30)) AS unix_date_value_day_30
        |""".stripMargin).as[(String, String, String)].collect()

    dateConversionResults should have size 1
    dateConversionResults(0) shouldEqual (("1970-01-01", "1970-01-31", "30"))

Finally, thanks to the work made by ulysses-you you can read the current timezone of your application:

  private val sparkSession = SparkSession.builder()
    .appName("Spark 3.1: new functions").master("local[*]")
    .config("spark.sql.session.timeZone", "UTC")
    .getOrCreate()

    val currentTimezone = sparkSession.sql(
      """
        |SELECT CURRENT_TIMEZONE()
        |""".stripMargin).as[String].collect()

    currentTimezone should have size 1
    currentTimezone(0) shouldEqual "UTC"

Metadata functions

In the next category of functions Kent Yao added a function returning possibility to get the name of the current catalog backing the read dataset. In the second feature from this category, Yuming Wang and Takeshi Yamamuro added a function computing the number of bucket for an expression:

    val bucketsWithCatalog = sparkSession.sql(
      """
        |SELECT
        | WIDTH_BUCKET(6, 1, 10, 5) AS bucket_for_5_buckets,
        | WIDTH_BUCKET(6, 1, 10, 2) AS bucket_for_2_buckets,
        | CURRENT_CATALOG() AS current_catalog
        |""".stripMargin).as[(BigInt, BigInt, String)].collect()

    bucketsWithCatalog should have size 1
    bucketsWithCatalog(0) shouldEqual((3, 2, "spark_catalog"))

Remaining functions: RegEx, window and errors

To terminate, let's see what happened for "other" functions. To start, you can get any value from a window thanks to the nth_value implemented by Jiaan Geng. One thing to notice, though. The window is a "sliding" window, meaning that if the queried offset was not evaluated yet, you will see a null. Also, you can skip the nulls in your offset evaluation. You can find both these cases presented in the snippet below:

    val playersDataset = Seq(
      ("player1", Some(1), 1), ("player2", Some(1), 1),
      ("player1", Some(5), 2), ("player2", Some(3), 2),
      ("player1", Some(11), 3), ("player2", None, 3),
      ("player1", Some(19), 4), ("player2", Some(7), 4)
    ).toDF("player_login", "new_points", "timestamp")

    val playerWindow = Window.partitionBy($"player_login").orderBy($"timestamp".asc)
    val thirdPointDoesntSkipNulls = functions.nth_value($"new_points", 3, ignoreNulls = false).over(playerWindow)
    val thirdPointSkipsNulls = functions.nth_value($"new_points", 3, ignoreNulls = true).over(playerWindow)

    val playersPointIn3rdPosition = playersDataset
      .select(functions.concat($"player_login", functions.lit("-"), $"timestamp"),
        thirdPointDoesntSkipNulls.as("point_not_ignore_nulls"),
        thirdPointSkipsNulls.as("point_ignore_nulls"))
      .as[(String, Option[Int], Option[Int])]
      .collect()

    playersPointIn3rdPosition should have size 8
    playersPointIn3rdPosition should contain allOf(
      ("player1-1", None, None), ("player1-2", None, None),
      ("player1-3", Some(11), Some(11)), ("player1-4", Some(11), Some(11)),
      ("player2-1", None, None), ("player2-2", None, None),
      // As you can see, with ignoreNulls=true, the nth position ignores nulls
      // and that's why you can see that the 4th row is returned
      ("player2-3", None, None), ("player2-4", None, Some(7))
    )

The second function, added by Jiaan Geng, is regexp_extract_all. Unlike the regexp_extract, it returns all substrings matching the pattern. You can see the difference in the test below:

    Seq(("1,2,3 abcdef 4,5,6"))
      .toDF("sample_text").createTempView("regexp_table")

    val extractedExpressions = sparkSession.sql(
      """
        |SELECT
        | REGEXP_EXTRACT_ALL(sample_text, "(\\d+)") AS extraction,
        | REGEXP_EXTRACT_ALL(sample_text, "(\\d+),(\\d+),(\\d+)", 2) AS extraction_groups,
        | REGEXP_EXTRACT(sample_text, "(\\d+)") AS extraction_simple,
        | REGEXP_EXTRACT(sample_text, "(\\d+),(\\d+),(\\d+)", 2) AS extraction_simple_groups
        |FROM regexp_table
        |""".stripMargin).as[(Seq[String], Seq[String], String, String)].collect()

    extractedExpressions should have size 1
    extractedExpressions(0) shouldEqual (
      Seq("1", "2", "3", "4", "5", "6"),
      Seq("2", "5"),
      "1",
      "2"
      )

To terminate this category, Karen Feng worked on the assertion and error trigger functions. She added a new function called RAISE_ERROR to fail the processing if some condition is not met. She also extended the ASSERT_TRUE by a possibility to define a custom error message. If the condition evaluates to false, you should see the exception like:

Job aborted due to stage failure: Task 2 in stage 3.0 failed 1 times, most recent failure: Lost task 2.0 in stage 3.0 (TID 8) (192.168.0.55 executor driver): java.lang.RuntimeException: 3 is an odd number
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(generated.java:72)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(generated.java:108)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
// ...

You can play with this feature on your own here:

    // Repartition to show that there are more than 1 tasks
    Seq((4), (8), (10), (12), (3), (6)).toDF("number1").repartition(6)
      .createTempView("error_management_table")

    val exception = intercept[SparkException] {
      sparkSession.sql(
        """
          |SELECT
          | ASSERT_TRUE(number1 % 2 == 0, CONCAT(number1, " is an odd number")),
          | number1
          |FROM error_management_table
          |""".stripMargin).show(false)
    }
    exception.getMessage should include("java.lang.RuntimeException: 3 is an odd number")

and here:

    // Repartition to show that there are more than 1 task
    Seq((4), (8), (10), (12), (3), (6)).toDF("number1")
      .repartition(6).createTempView("error_management_table")

    val exception = intercept[SparkException] {
      sparkSession.sql(
        """
          |SELECT
          | number1,
          | CASE WHEN number1 % 2 != 0 THEN RAISE_ERROR(CONCAT(number1, " is an odd number"))
          |   ELSE "The number is even"
          | END
          |FROM error_management_table
          |""".stripMargin).show(false)
    }
    exception.getMessage should include("java.lang.RuntimeException: 3 is an odd number")

That's all for the new functions in Apache Spark 3.1.1, and at the same time, all for the "What's new in Apache Spark 3.1.1" series. Next week I will resume my work on other Apache Spark features not necessarily related to this specific release.