What's new in Apache Spark 3.0 - PostgreSQL feature parity

Versions: Apache Spark 3.0.0

Apart from the date and time management, another big feature of Apache Spark 3.0 is the work on the PostgreSQL feature parity, that will be the topic of my new article from the series.

The work done on this feature was quite huge and to present it clearly, I divided the post into 5 sections. I will try to cover some major aspects and only mention the remaining ones briefly.

PostgreSQL standard?

PostgreSQL is one of the Open Source databases having the best support for SQL standards and more exactly, the targeted SQL:2011 version. SQL:2011 standard requires databases to implement 179 features for the full Core conformance. PostgreSQL in its most recent stable version (12 at the time of writing) conforms to at least 160 of them. Logically then, PostgreSQL was chosen as the model for following the compliance effort.

Breaking changes?

Does it mean then that your pipeline will be broken if, for instance, you use a new reserved keyword, in your table declaration? No, if you don't turn the spark.sql.ansi.enabled property on, which by default is set to false:

class PostgreSQLCompatibilityAnsiEnabledTest extends AnyFlatSpec with Matchers with BeforeAndAfter {

  behavior of "ANSI enabled flag"

  it should "disallow the use of forbidden keywords when enabled" in {
    val sparkSession = ansiSparkSession(true)

    sparkSession.sql("DROP TABLE IF EXISTS test_ansi_enabled")
    val parseError = intercept[ParseException] {
      // current_user and primary are one of the new reserved keywords
      // let's use them in the schema to make the table creation fail
      sparkSession.sql(
        """
          |CREATE TABLE test_ansi_enabled (current_user STRING, primary BOOLEAN)
          |""".stripMargin)
    }

    parseError.getMessage() should include("no viable alternative at input 'current_user'(line 2, pos 32)")
  }

  it should "allow the use of forbidden keywords when disabled" in {
    val sparkSession = ansiSparkSession(false)

    sparkSession.sql("DROP TABLE IF EXISTS test_ansi_enabled")
    sparkSession.sql(
      """
        |CREATE TABLE test_ansi_enabled (current_user STRING, primary BOOLEAN)
        |""".stripMargin)
    sparkSession.sql("DROP TABLE IF EXISTS test_ansi_enabled")
  }

  def ansiSparkSession(ansiFlag: Boolean) = SparkSession.builder()
    .appName("Spark 3.0 features - ANSI test").master("local[*]").enableHiveSupport()
    .config("spark.sql.ansi.enabled", ansiFlag)
    .getOrCreate()

}

If you remember my Writing custom optimization in Apache Spark SQL - custom parser article, you certainly know that the grammar used by the parser is defined in text files that are later parsed by ANTLR library. In the changes made for Apache Spark 3.0, you can notice that 2 new categories of identifiers appeared, called ansiReserved and defaultReserved. The former one takes most of the reserved keywords from ANSI standard whereas the latter keeps the keywords from the previous Apache Spark version. The ANSI configuration helps to control whether the parser should fail when it encounters a new reserved keyword. Please notice that it's not the final version. Some of the words from the ANSI standard aren't considered as reserved in PostgreSQL and a follow-up task (SPARK-26905) addresses that in 3.0.1 release.

ANSI standard

The adaptation of Apache Spark SQL to PostgreSQL standards was divided into 2 big parts. The first of them covers the ANSI-compliant features present in PostgreSQL. In this family of changes, there are multiple evolutions for the syntax, like the few presented in this test snippet, but they're not the most important ones in my eyes. You will find there the features like LIKE ESCAPE syntax support, nested bracket comments, shorter formats for decimal types, REAL data type support,

  "new ANSI-compliant syntax changes" should "be included in Spark 3.0" in {
    val testTableName = "data_types_table"
    // First change, real data type is an alias for the float and numeric of decimal
    // https://github.com/apache/spark/pull/26537/files
    // But only for the queries, there is no new RealType nor NumericType -> Float and Decimal are kept
    syntaxChangeSparkSession.sql(s"DROP TABLE IF EXISTS ${testTableName}")
    syntaxChangeSparkSession.sql(
      s"""
         |CREATE TABLE ${testTableName} (average_salary REAL, mean_salary NUMERIC(5, 2))
         |""".stripMargin)

    // Another interesting feature, the nested bracket comments
    syntaxChangeSparkSession.sql(
      s"""
         |SELECT *
         |/* commented now, we take all cols every time
         |   SELECT nr, letter
         |*/ FROM ${viewName}
         |""".stripMargin).show()

    // A new boolean predicate syntax is also supported
    val matchingNumbers = syntaxChangeSparkSession.sql(
      s"""
        |SELECT nr FROM ${viewName} WHERE bool_flag_view IS TRUE
        |""".stripMargin).as[Long].collect()
    matchingNumbers should have size 2
    matchingNumbers should contain allOf(0, 2)

    // New alias for decimal fields
    val schemaFromCast = syntaxChangeSparkSession.sql(
      """
        |SELECT CAST('2.0' AS DEC) AS casted_column
        |""".stripMargin).schema
    schemaFromCast shouldEqual StructType(Array(StructField("casted_column", DecimalType(10, 0), true)))
  }


Much more impactful changes concern reserved keywords covered before, deletes that I will cover in another post, WITH clause in the subqueries and FILTER(WHERE...) predicate syntax.

WITH clause in subqueries

From Apache Spark 2.1.0 you can use the WITH clause (SPARK-17590) to name a CTE query. Starting from 3.0.0, you can use them in subqueries:

  "WITH clause" should "be supported in subquery" in {
    val numbers = syntaxChangeSparkSession.sql(
      """
        |SELECT * FROM (
        | WITH numbers(number) AS (
        |   (SELECT 1) UNION (SELECT 2)
        | )
        | SELECT number FROM numbers
        |)
        |""".stripMargin).as[Int].collect()

    numbers should have size 2
    numbers should contain allOf(1, 2)
  }

FILTER (WHERE ...) predicate

Before discovering the task adding this feature, I wasn't aware of the possibility to add a filter condition on the aggregate expression. For example, in the following query, only one row of 3 will be included in the aggregate:

  "FILTER WHERE clause" should "keep only the rows matching the predicate in the aggregate" in {
    val counters = syntaxChangeSparkSession.sql(
      s"""
        |SELECT
        |COUNT(*) AS all_elements,
        |COUNT(*) FILTER (WHERE nr < 1) AS matching_elements
        |FROM ${viewName}
        |""".stripMargin).as[(Long, Long)].collect()

    counters should have size 1
    counters(0) shouldEqual (3, 1)
  }

Implementation-dependent behaviors

Apart from the ANSI-compliant changes, Apache Spark 3.0 also brings some evolution related to PostgreSQL and other databases but not necessarily to the ANSI standard. Some of the proposed changes like for instance adding an extra cast syntax (::), adding a new POINT data type or changing NULL sort order, weren't implemented. An important criterion for the implementation decision was the support of the feature in more than 1 database and that's the reason why some of them were abandoned. Another reason, for the one of NULL sort order, was the fact that the change would break the current behavior and it was considered too impactful to the users.

Since I already covered new SQL functions, here I will focus only on some behavior and syntax changes. Among the minor ones, you will find the syntax changes like casting on/off into a boolean type, SUBSTRING syntax with for, enhanced TRIM (position and custom characters support) and LPAD/RPAD (added optional pad characters) functions

  "implementation-dependent changes" should "enable new SQL syntax" in {
    val transformed = syntaxChangeSparkSession.sql(
      """
        |SELECT
        | CAST('yes' AS BOOLEAN) AS boolFlagTrue, CAST('no' AS BOOLEAN) AS boolFlagFalse,
        | SUBSTRING('1234567' FROM 3 FOR 2) AS substringText,
        | LPAD('abc', 5, '.') AS lpadChars,
        | LPAD('abc', 5) AS lpadDefault,
        | TRIM('abc' FROM 'aXXXXcYYYb') AS customTrim,
        | TRIM(TRAILING FROM '   abc    ') AS positionalTrim
        |""".stripMargin
    ).as[ImplementationDependentQueryResult].collect()

    transformed should have size 1
    transformed(0) shouldEqual ImplementationDependentQueryResult(
      boolFlagTrue = true, boolFlagFalse = false, substringText = "34",
      lpadChars = "..abc", lpadDefault =  "  abc", customTrim = "XXXXcYYY",
      positionalTrim = "   abc"
    )
  }

case class ImplementationDependentQueryResult(boolFlagTrue: Boolean, boolFlagFalse: Boolean,
                                              substringText: String, lpadChars: String, lpadDefault: String,
                                              customTrim: String, positionalTrim: String)

Not implemented (at all)

As I mentioned before, not all tasks listed in the epics were implemented. Some of them were already implemented (eg. SPARK-28073: ANSI SQL: Character literals), others were too PostgreSQL-centric (SPARK-30042: Add built-in Array Functions: array_dims), or too impactful for the current workflows using Apache Spark (eg. SPARK-28333: NULLS FIRST for DESC and NULLS LAST for ASC.

So I won't cover all of them but one caught my attention. Initially, we were supposed to be able to compute average and sums for interval data types. The changes were made in SPARK-29688 and SPARK-29663 and merged to master. But after a discussion these changes were reverted because of their quite unclear semantics. You can learn more about the reasons in the PR made by Wenchen Fan:


When we do sum/avg, we need a wider type of input to hold the sum value, to reduce the possibility of overflow. For example, we use long to hold the sum of integral inputs, use double to hold the sum of float/double. However, we don't have a wider type of interval. Also the semantic is unclear: what if the days field overflows but the months field doesn't? Currently the avg of 1 month and 2 month is 1 month 15 days, which assumes 1 month has 30 days and we should avoid this assumption.

Despite the reverted and not implemented features, Apache Spark 3.0 evolved a lot and got even closer to the ANSI standard and the features supported by other databases 💪