Kafka timestamp as the watermark

Versions: Apache Spark 2.4.2

In the first version of my demo application I used Kafka's timestamp field as the watermark. At that moment I was exploring the internals of arbitrary stateful processing so it wasn't a big deal. But just in case if you're wondering what I didn't keep that for the official demo version, I wrote this article.

The post starts by recalling the data format stored in Apache Kafka partitions. It focuses mainly on the timestamp attribute. The second part explains why at the end I chosen the event-time coming with the data record.

Kafka record

The last time I was exploring Apache Kafka internals was July 2016 when I wrote the post about Replication in Apache Kafka. Before that I also analyzed the records stored in the partitions (Kafka messages format). Just to recall, from the high level a Kafka record is composed of 3 attributes:

An interesting thing to notice here, that I didn't mention at 3 years ago, is that the ingestion time can be set by the producer! The SDK gives that possibility with org.apache.kafka.clients.producer.ProducerRecord[K, V] class which one of constructors takes a timestamp parameter. If the parameter is not defined, the producer will set it with the current timestamp. However, it doesn't guarantee that this value will be used in the topic.

KIP-32 brought a possibility to configure the topic and take producer's timestamp (CreateTime property) or the partition's append timestamp (LogAppendTime). So even if you set your timestamp on the producer side, your topic may ignore it.

But that's not the last subtility. Kafka also has a property called log.message.timestamp.difference.max.ms. It's used to check whether the timestamp set on the message is not too old regarding the broker's clock. In other words, if the difference between the message's timestamp and the broker's clock is greater than the value specified in this property, the message will be rejected with Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidTimestampException: The timestamp of the message is out of acceptable range. exception. But, this check doesn't apply when the topic is configured to use the log's append time as the timestamp.

Long story short - you can set a custom timestamp attribute, like the event time of your input data, but the topic can overwrite it with the append time or ignore your data if the client's timestamp is too late.

Kafka timestamp as Structured Streaming event-time watermark

Kafka's timestamp can be then a candidate for the event-time watermark if it's explicitly set as so by the producer. Moreover, if log.message.timestamp.difference.max.ms is defined, Kafka can act directly as the filter for the late events! So globally you will have one watermark filtering out the late data at the Kafka's level, like for instance rejecting events older than 24 hours. On the other side, you will have Structured Streaming watermark to filter out late events depending on various business rules.

Thus, technically it's possible to use timestamp but conceptually it's less evident. For me, an event-time-based watermark comes with the input data, and in Kafka, the input data is stored as a message's body. Timestamp attribute is more related to the topic, so to the underlying storage mechanism. I can bet that if you'll read some code and see that:

    val query = dataFrame.selectExpr("CAST(value AS STRING)")
      .select(functions.from_json($"value", Visit.Schema).as("data"))
      .select($"data.*")
.withWatermark("event_time", "10 minutes")

You'll immediately assimilate that the event-time comes from the data. On the flip side, if you see that:

   val query = dataFrame.selectExpr("CAST(value AS STRING)", "timestamp")
      .select(functions.from_json($"value", Visit.Schema).as("data"), $"timestamp")
      .select($"data.*", $"timestamp")
.withWatermark("timestamp", "3 minutes")

You can think that it comes with Kafka's topic and very probably it uses the ingestion time. You'll need to analyze the topic configuration to see what value is used (producer's or broker's). Moreover, when you will use the timestamp of a topic you don't control, you won't know what is the mapping for the timestamp in the message body. So the bad point is about the readability.

To summarize, even though it's perfectly possible to use an event-time attribute as Kafka's timestamp attribute, for me a better and more readable approach would be the use of the event-time attribute from the message's body. It adds some overhead to retrieve that property but on the other hand, keeps the code clear. Timestamp can bring some confusion since its primary correlation is with the ingestion time. And that's the main reason why I put the event_time attribute from the record.