Implicit datetime conversion in Apache Spark SQL

Versions: Apache Spark 2.4.2

If you've ever wondered why when you write "2019-05-10T20:00", Apache Spark considers it as a timestamp field? The fact of defining it as a TimestampType is one of the reasons, but another question here is, how Apache Spark does the conversion from a string into the timestamp type? I will give you some hints in this blog post.

I will start this post by showing you the place responsible for types conversion. On this occasion, I'll cover shortly the ones concerning the timestamps. In the second part, I will focus on different conversion formats and highlight the ones that don't work. Please keep in mind that datetime conversion will change in Spark 3.0 but I will cover that next year :-)

To timestamp conversion

Some of data sources support a parameter called timestampFormat. As its name indicates, it defined the timestamp format that will be used to make the conversion between strings and timestamps. Knowing that we can easily find the place making that conversion, at last for JSON data source:

// org.apache.spark.sql.catalyst.json.JacksonParser#makeConverter
    case TimestampType =>
      (parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) {
        case VALUE_STRING =>
          val stringValue = parser.getText
          // This one will lose microseconds parts.
          // See https://issues.apache.org/jira/browse/SPARK-10681.
          Long.box {
            Try(options.timestampFormat.parse(stringValue).getTime * 1000L)
              .getOrElse {
                // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
                // compatibility.
                DateTimeUtils.stringToTime(stringValue).getTime * 1000L
              }
          }

As you can see, the generated converter first tries to use the user-defined format and in case of any issue, it fallbacks into a DateTimeUtils conversion class. The same conversion technique is used in CSV data source. Other data sources can have different logic though simply because they won't allow storing the string as a timestamp. And you can observe that for Apache Avro deserializer where only the timestamps with milliseconds are converted:

      case (LONG, TimestampType) => avroType.getLogicalType match {
        case _: TimestampMillis => (updater, ordinal, value) =>
          updater.setLong(ordinal, value.asInstanceOf[Long] * 1000)
        case _: TimestampMicros => (updater, ordinal, value) =>
          updater.setLong(ordinal, value.asInstanceOf[Long])
        case null => (updater, ordinal, value) =>
          // For backward compatibility, if the Avro type is Long and it is not logical type,
          // the value is processed as timestamp type with millisecond precision.
          updater.setLong(ordinal, value.asInstanceOf[Long] * 1000)
        case other => throw new IncompatibleSchemaException(
          s"Cannot convert Avro logical type ${other} to Catalyst Timestamp type.")
      }

Conversion formats

In the following examples I will focus on the case when the input data doesn't match the timestamp format from the data source options, so the case when Spark goes to stringToTime function. And here many different things can happen. If the datetime has a somehow strange format like "2000-01-01T00:00GMT+01:00", the datetime is first reformatted and the GMT removed.

In the next step, Spark uses Java's Date and Timestamp classes if the input doesn't contain the "T" separator. That part supports the datetime formats like yyyy-[m]m-[d]d and yyyy-[m]m-[d]d hh:mm:ss[.f...].

At the very last option, Spark will do the least resort action by calling javax.xml.bind.DatatypeConverterImpl#_parseDateTime(CharSequence s). It will delegate the call and build a class that I met for the first time, the XMLGregorianCalendarImpl. It was added in Java5 to handle XML documents written according to the XML Schema specification and was used up to Apache Spark 3.0. Since I'm writing this post before the official release, I will give you the examples with the old Gregorian format but please keep in mind that they may be out-of-date after 3.0 release which replaces that datetime management by new Java time API. I'll try to write a new post from the series "What's new in 3.0..." about that topic.

The pattern accepted by XMLGregorialCalendar is defined in the XML schema specification, under "3.2.7.1 Lexical representation" section and it states that the accepted date format should be '-'? yyyy '-' mm '-' dd 'T' hh ':' mm ':' ss ('.' s+)? (zzzzzz)? where (s+) is for fractional seconds, (zz...) for the timezone. Let's check all of that in a unit test:

  override def beforeAll(): Unit = {
    val dateTimeLogs =
      """
        |{"key": "format_timestamp", "log_action": "2019-05-10 20:00:00.000" }
        |{"key": "format_date_without_0_in_month", "log_action": "2019-5-12" }
        |{"key": "weird_format_gmt", "log_action": "2019-01-01T00:00:00GMT+01:00"  }
        |{"key": "format_hh_mm_timezone", "log_action": "2019-01-01T00:00:00+00:00"  }
        |{"key": "format_z_timezone", "log_action": "2019-01-06T18:30:00Z"  }
        |{"key": "format_text_timezone", "log_action": "2019-01-06T18:30:00[Europe/Paris]"  }
        |{"key": "format_hh_mm_timezone_fraction_sec", "log_action": "2019-01-06T18:30:00.000+01:00"  }
        |{"key": "format_text_datetime", "log_action": "Wed, 4 Jul 2001 12:08:56 +0100"  }
        |{"key": "format_only_year_month", "log_action": "2019-01Z"  }
        |{"key": "format_hh_mm_timezone_with_text", "log_action": "2018-07-07T15:20:14.372+01:00[Europe/Paris]"  }
        |{"key": "not_existent_day", "log_action": "2019-02-30T18:30:00+01:00"  }
      """.stripMargin
    FileUtils.writeStringToFile(new File(testInputFilePath), dateTimeLogs)
  }

  "JSON dates" should "be read correctly if their formats are good" in {   
    val schema = StructType(Seq(
      StructField("key", StringType), StructField("log_action", TimestampType)
    ))

    val readLogs = sparkSession.read.schema(schema).json(testInputFilePath)
      .filter("key IS NOT NULL")
      .map(row => (row.getAs[String]("key"),
      row.getAs[Timestamp]("log_action").toString))
      .collect()


    sparkSession.read.schema(schema).json(testInputFilePath).show(false)
    readLogs should have size 7
    val logsByKey = readLogs.groupBy(keyLogPair => keyLogPair._1).mapValues(keyLogPairs => {
      assert(keyLogPairs.size == 1)
      keyLogPairs(0)._2
    })
    logsByKey("format_timestamp") shouldEqual "2019-05-10 20:00:00.0"
    logsByKey("format_date_without_0_in_month") shouldEqual "2019-05-12 00:00:00.0"
    logsByKey("weird_format_gmt") shouldEqual "2019-01-01 00:00:00.0"
    logsByKey("format_hh_mm_timezone") shouldEqual "2019-01-01 01:00:00.0"
    logsByKey("format_z_timezone") shouldEqual "2019-01-06 19:30:00.0"
    logsByKey("format_hh_mm_timezone_fraction_sec") shouldEqual "2019-01-06 18:30:00.0"
    logsByKey("format_hh_mm_timezone_with_text") shouldEqual "2018-07-07 16:20:14.372"
  }

As you can see, Spark behaves consistently regarding the XML Schema rules and the formats accepted in valueOf methods of Date and Timestamp classes. Now, if you have a more specific demand, as I had for the textual representation of my date (Wed Oct 20 16:43:53 CEST 2019), you can still define the conversion format for the reader:

  "a textual JSON date" should "be converted thanks to the custom timestampFormat" in {
    val schema = StructType(Seq(
      StructField("key", StringType), StructField("log_action", TimestampType)
    ))

    val readLogs = sparkSession.read.schema(schema)
      .option("timestampFormat", "EEE, d MMM yyyy HH:mm:ss Z").json(testInputFilePath)
      .filter("key IS NOT NULL")
      .map(row => (row.getAs[String]("key"),
        row.getAs[Timestamp]("log_action").toString))
      .collect()

    // Globally we'll get the same results as in the previous test but thanks to a
    // custom timeformat, we're also able to catch the textual datetime
    // The single difference is the absence of "format_hh_mm_timezone_with_text" which
    // cannot be converted because of the changed timestampFormat parameter
    readLogs should have size 7
    val logsByKey = readLogs.groupBy(keyLogPair => keyLogPair._1).mapValues(keyLogPairs => {
      assert(keyLogPairs.size == 1)
      keyLogPairs(0)._2
    })
    logsByKey("format_text_datetime") shouldEqual "2001-07-04 13:08:56.0"
    logsByKey("format_timestamp") shouldEqual "2019-05-10 20:00:00.0"
    logsByKey("format_date_without_0_in_month") shouldEqual "2019-05-12 00:00:00.0"
    logsByKey("weird_format_gmt") shouldEqual "2019-01-01 00:00:00.0"
    logsByKey("format_hh_mm_timezone") shouldEqual "2019-01-01 01:00:00.0"
    logsByKey("format_z_timezone") shouldEqual "2019-01-06 19:30:00.0"
    logsByKey("format_hh_mm_timezone_fraction_sec") shouldEqual "2019-01-06 18:30:00.0"
  }

Datetime management in Apache Spark is an interesting point. As you can see, the framework can handle multiple formats for one common JSON dataset which is quite powerful, especially when the data governance is weak and your producers send the time as they want, with T separator, without T separator, with trailing fraction seconds, and so forth. With Spark, you'd be able to deal with them transparently.