What's new in Apache Spark 3.2.0 - miscellaneous changes

Versions: Apache Spark 3.2.0 https://github.com/bartosz25/spark-...3/spark-3.2.0-features/misc_changes

My Apache Spark 3.2.0 comes to its end. Today I'll focus on the miscellaneous changes, so all the improvements I couldn't categorize in the previous blog posts.

New ebook 🔥

Learn 84 ways to solve common data engineering problems with cloud services.

👉 I want my Early Access edition

Exception messages

If you wrote an Apache Spark 3.2.0 pipeline and it failed, you probably saw the first change, the new error messages. Allison Wang and Xinyi Yu proposed a SPIP where they listed the current pain points of the exceptions in Apache Spark, and submitted an improvement strategy. Among the pain points you will mostly find:

To overcome these problems, they proposed to group the error messages into a dedicated QueryExecutionErrors class in each module (SQL, catalyst, ...). Jiaan Geng, Karen Feng, dgd-contributor and Gengliang Wang also helped implement this change.

In addition to these standardization efforts, the error messages also were extended by the SQLSTATE. Karen Feng added this information for some ANSI/ISO standard errors, such as division by zero (SQLSTATE = 22012) or missing columns in the input (SQLCODE = 42000).

    val sparkSession = SparkSession.builder()
      .appName("SPARK-34920 : errors with extra fields").master("local[*]")
      .config("spark.sql.ansi.enabled", true) // Otherwise a null is returned instead of the exception
      .getOrCreate()

    val exception = intercept[SparkThrowable] {
      sparkSession.sql("SELECT 3/0").show()
    }
    exception.getSqlState() shouldEqual "22012"
    exception.getErrorClass() shouldEqual "DIVIDE_BY_ZERO"

User-Defined Type

The second "misc" feature is not something new in the code base but it's if we consider the public API. The User-Defined Types have been there for a while, since Apache Spark 1! I even blogged about them when I was still learning Scala and hence, was using Java for the code snippets (User Defined Type). And it was Apache Spark 1.6.

However, the API was considered at that time as something to improve and the community decided to make it private in Hide UserDefinedType in Spark 2.0. At the same time, the JIRA ticket aiming to make it public (SPARK-7768) was still open, and some tricky solutions existed, such as creating an org.apache.spark.sql package and putting there the UDTs. Unfortunately, these solutions did work only for Java 8.

To overcome these issues and because of the lack of further improvement discussions about the UDT API, Sean Owen put it back to the public API! However, it's still annotated as a @DeveloperApi, so maybe it won't be the first thing you will do with Apache Spark.

But if you do and are looking for some demo code, it's just below:

class UserDefinedTypeTest extends AnyFlatSpec with Matchers  {

  // You can run the code with Spark 3.1.0 to see the registering problem
  it should "registre the UDT in Apache Spark 3.2.0" in {
    val sparkSession = SparkSession.builder()
      .appName("SPARK-7768 : promote UDT to the public API").master("local[*]")
      .getOrCreate()

    UDTRegistration.register("com.waitingforcode.Account",
      "com.waitingforcode.AccountUserDefinedType")
    import sparkSession.implicits._

    val schema = Seq(Account("abc", 1), Account("def", 2)).toDF("account").schema

    schema.simpleString shouldEqual "struct"
  }

}

case class Account(nr: String, priority: Int)

class AccountUserDefinedType extends UserDefinedType[Account] {
  override def sqlType: DataType = StructType(Seq(
    StructField("nr", DataTypes.StringType, nullable = false),
    StructField("priority", DataTypes.IntegerType, nullable = false)
  ))

  override def serialize(accountClass: Account): InternalRow = {
    InternalRow(UTF8String.fromString(accountClass.nr), accountClass.priority)
  }

  override def deserialize(datum: Any): Account = datum match {
    case row: InternalRow => Account(row.getString(0), row.getInt(1))
  }

  override def userClass: Class[Account] = classOf[Account]

  override def typeName: String = "accountuserdefinedtype_typename_override"
}

Monitoring

And let's terminate this blog post with a section dedicated to the monitoring and metrics. With the list of changes, this time, for the ease of presentation:

That's all for my exploration of Apache Spark 3.2.0. Apache Spark 3.3.0 is on the way but I hope to have some extra time to cover other Spark-related topics from my backlog!