What's new in Apache Spark 3.0 - Proleptic Calendar and date time management

Versions: Apache Spark 3.0.0

When I was writing my blog post about datetime conversion in Apache Spark 2.4, I wanted to check something on Apache Spark's Github. To my surprise, the code had nothing in common with the code I was analyzing locally. And that's how I discovered the first change in Apache Spark 3.0. The first among few others that I will cover in a new series "What's new in Apache Spark 3.0".

To be honest with you, I thought that changing date formatting is a quite simple operation. However, after looking at the history of this evolution in the backlog, I found how I was wrong 😱 And I will try to share this with you in the next sections. In the first, a very short one, I will explain why the change was required. After that, I will present some technical details, like the backward compatibility and rebasing.r

Proleptic Gregorian calendar

Before Apache Spark 3.0, the framework used the hybrid Gregorian+Julian calendar and there were multiple reasons for changing it to the Proleptic Gregorian calendar. First and foremost, this brand new release of Apache Spark tends to go closer to SQL standards and the Gregorian+Julian calendar was not a standardized way to represent temporal information. Gregorian calendar is the one quoted in ISO 8601 specification which is a norm for the exchange of date- and time-related data.

Apart from this, the use of not non-standardized calendar led to some interoperability issues with PySpark and Pandas, which for some of them should be fixed after the release. Also, the previously used Java's API (java.util package) tends to be slower than the new used API (java.time package). That being said, I didn't find any direct benchmarks for Apache Spark, but according to a few other, more general ones quoted in the "Read more" section, this fact is true.

Those are the main reasons behind switching to a more standardized calendar. However, and that was a quite big surprise for me, the switch was quite painful.

Challenges

Why was changing the calendar so hard? After all, the difference between the Proleptic Gregorian and Julian+Gregorian calendars is mostly about dates in October 1582 and before. SPARK-30951 gives some quite interesting use cases. Working on historical data is nothing strange but using very old dates like "1200-01-01" to indicate a null value IMO, really is. A less strange, but still interesting use case for the old dates, is a library that transforms sensitive dates by converting them to the past dates, like "0602-04-04".

Moreover, the format supported by the new API has some important differences with the old format. One of them is that DateTimeFormatter, added to 3.0, defines a year as a u whereas in the old formatter (SimpleDateFormat) this letter means the day of the week.

To address these 2 issues, Apache Spark contributors implemented 2 different solutions that I'm describing in the next sections.

Internal format

The first problem encountered with this change was related to the different characters used in date time patterns (old SimpleDateFormat vs new DateTimeFormatter). To handle it, Apache Spark 3.0 provides its own layer for date time formatting which is backward compatible, ie. you still define the year as a "y" but under-the-hood, the framework does the job to translate it and adapt to the new datetime API. The magic happens in DateTimeFormatterHelper#convertIncompatiblePattern(pattern: String):

// ...
  final val weekBasedLetters = Set('Y', 'W', 'w', 'u', 'e', 'c')
  final val unsupportedLetters = Set('A', 'n', 'N', 'p')

          for (c <- patternPart if weekBasedLetters.contains(c)) {
            throw new IllegalArgumentException(s"All week-based patterns are unsupported since" +
              s" Spark 3.0, detected: $c, Please use the SQL function EXTRACT instead")
          }
          for (c <- patternPart if unsupportedLetters.contains(c) ||
            (isParsing && unsupportedLettersForParsing.contains(c))) {
            throw new IllegalArgumentException(s"Illegal pattern character: $c")
          }
// ...
          // In DateTimeFormatter, 'u' supports negative years. We substitute 'y' to 'u' here for
          // keeping the support in Spark 3.0. If parse failed in Spark 3.0, fall back to 'y'.
          // We only do this substitution when there is no era designator found in the pattern.
          if (!eraDesignatorContained) {
            patternPart.replace("y", "u")
          } else {
            patternPart
          }

From the snippet you can also notice that not all characters are supported in the pattern. Among the forbidden ones you will find final val unsupportedLetters = Set('A', 'c', 'e', 'n', 'N', 'p'). If you check the documentation for DateTimeFormatter and SimpleDateFormat, you will find that there is not really a match between them so no possibility to adapt the old API to the new one. And if you use one of them in your datetime pattern, you will get this error:

  "not allowed time format characters" should "make the processing fail" in {
    val exception = intercept[IllegalArgumentException] {
      sparkSession.read.option("timestampFormat", "Y A").json(datasetPath).show()
    }

    exception.getMessage should startWith("All week-based patterns are unsupported since Spark 3.0, detected: Y, Please use the SQL function EXTRACT instead")
  }

With this brand new formatter you should also be aware of one new breaking change. It won't fall back to any other formatter as it was the case in Spark 2. In the post I quoted in the introduction, Implicit datetime conversion in Apache Spark SQL, you can learn that in the previous versions the framework worked on the user format and if the conversion failed, it fell back into DateTimeUtils conversion class. In Apache Spark 3.0, a single formatter being an instance of DateTimeFormatter is created. If the formatting fails, the framework uses an instance of legacy formatters to check whether the operation would succeed in the previous release and if it's the case, depending on the spark.sql.legacy.timeParserPolicy, it will either return the formatted date or fail.

The new formatter class is Iso8601TimestampFormatter class. The fallback mechanism looks like in this snippet:

  override def parse(s: String): Long = {
    val specialDate = convertSpecialTimestamp(s.trim, zoneId)
    specialDate.getOrElse {
      try {
        val parsed = formatter.parse(s)
// ...
        Math.addExact(SECONDS.toMicros(epochSeconds), microsOfSecond)
      } catch checkParsedDiff(s, legacyFormatter.parse)
    }
  }
  protected def checkDiffResult[T](
      s: String, legacyParseFunc: String => T): PartialFunction[Throwable, T] = {
    case e: DateTimeParseException if SQLConf.get.legacyTimeParserPolicy == EXCEPTION =>
      val res = try {
        Some(legacyParseFunc(s))
      } catch {
        case _: Throwable => None
      }
      if (res.nonEmpty) {
        throw new SparkUpgradeException("3.0", s"Fail to parse '$s' in the new parser. You can " +
          s"set ${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore the behavior " +
          s"before Spark 3.0, or set to CORRECTED and treat it as an invalid datetime string.", e)
      } else {
        throw e
      }
  }

Please notice that I'm writing here about timestamp fields but the same rule applies on the date fields. Let's see now how this fallback logic behaves, depending on the timeParserPolicy configuration.

Backward compatibility

To implement the compatibility with previous versions of Apache Spark, the version 3 of the framework has a new property called spark.sql.legacy.timeParserPolicy which defines the parser policy used to deal with dates. It can be one of:

Rebase

So far for me, rebase was the world reserved to Git. However, it's also used in Spark 3.0 to describe the behavior for the files written in Parquet, Avro, or ORC formats. These formats store the temporal values as the representation of days or seconds from the Unix epoch. The problem with that is the files written with Spark 2.4, so Gregorian+Julian calendar may return different results when reading with Spark 3.0, especially for old dates. Below you can find an example of a Parquet file written with Spark 2.4 but read with Spark 3.0:

// Writer's part, Spark 2.4.5
object ParquetTimestampDataWriter extends App {
  private val outputPath = "/home/bartosz/workspace/spark-playground/spark-3.0-features/src/test/resources/parquet-spark-2.4.0"

  private val sparkSession: SparkSession = SparkSession.builder()
    .appName("Spark SQL Parquet Writer")
    .master("local[*]")
      .config("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
    .getOrCreate()
  import sparkSession.implicits._

  val dataset = Seq(
    (Timestamp.valueOf("1000-01-01 10:00:00"))
  ).toDF("datetime_col")

  dataset.write.mode(SaveMode.Overwrite).parquet(outputPath)
}

// Reader's part, Spark 3.0.0, default 
  "file created by Spark 2.4 read with CORRECTED mode" should "return invalid date" in {
    // FYI, in `corrected` we read the dates as it, so without rebasing
    val testSparkSession = createSparkSession(LegacyBehaviorPolicy.CORRECTED)
    import testSparkSession.implicits._

    val dateFromSpark240 = testSparkSession.read.parquet(s"${inputPath}").as[String].collect()

    dateFromSpark240 should have size 1
    // 1000-01-06 09:09:21 => but the initial date was 1000-01-01 10:00:00
    dateFromSpark240(0) shouldEqual "1000-01-06 09:09:21"
  }

To handle that, Apache Spark 3.0 comes with the "rebase" configurations like the following 2 for Parquet format:

I hope you see better now why changing the time formatting wasn't an obvious change in the codebase. And you should take all of this into account when upgrading to Apache Spark 3.0.