Watermarks and not grouped query - why they don't work

Several weeks ago I played with watermark, just to recall some concepts. I wrote a query and...the watermark didn't work! Of course, my query was wrong but this intrigued me enough to write this short article.

4-day workshop · In-person or online

What would it take for you to trust your Databricks pipelines in production?

A 3-day bug hunt on a 3-person team costs up to €7,200 in lost engineering time. This workshop teaches you to prevent that — unit tests, data tests, and integration tests for PySpark and Databricks Lakeflow, including Spark Declarative Pipelines.

Unit, data & integration tests
Medallion architecture & Lakeflow SDP
Max 10 participants · production-ready templates
See the full curriculum → €7,000 flat fee · cohort of up to 10
Bartosz Konieczny
Bartosz
Konieczny

TL;TR

The documentation clearly states which kind of queries are supposed to work with watermarks, so the post doesn't invent the wheel. However, I hope you will get an extra insight about the mechanism applying the watermarks for these queries.

To test the watermark, I wrote a query like this:

val inputStream = new MemoryStream[(Timestamp, String)](1, sparkSession.sqlContext)

  val aggregatedStream = inputStream.toDS().toDF("created", "name")
    .withWatermark("created", "1 minute")
    .agg(Map("name" -> "count"))
    .writeStream.format("console").option("truncate", false)
    .trigger(Trigger.ProcessingTime("20 seconds"))
    .outputMode("update")
    .start()

And I added the data to the MemoryStream. The watermark was not applied because all records, even the ones arrived after the watermark, was included in my result table. My first reflex was then "it has to be a timezone issue!". So I changed the time zones configuration everywhere:

 private val sparkSession: SparkSession = SparkSession.builder().appName("Watermark not grouped query") 
    .config("spark.sql.session.timeZone", "UTC")
    .master("local[2]").getOrCreate()
  TimeZone.setDefault(TimeZone.getTimeZone("UTC"))

But it didn't help. I went back then to the documentation and recalled to me that the watermark doesn't work for all use cases, ie. even if you use the withWatermark(...), it won't be physically executed. Among the queries supporting watermark, you will find the ones with windows or grouping expressions on the watermark columns. As you can see, my initial query wasn't even close to any of them. But that intrigued me enough to try to understand the difference on the plan when .withWatermark is defined for a query with groups and without them.

Only the query involving watermark in the grouping column worked and the records occurred before the current watermark were ignored:

  val aggregatedStream = inputStream.toDS().toDF("created", "name")
    .withWatermark("created", "1 minute") 
    .groupBy("created")
    .count()
    .writeStream.format("console").option("truncate", false)
    .trigger(Trigger.ProcessingTime("20 seconds"))
    .outputMode("update")
    .start()

Explanation

What's the reason for that behavior? The answer is hidden in WatermarkSupport class and more exactly here:

  /** Generate an expression that matches data older than the watermark */
  lazy val watermarkExpression: Option[Expression] = {
    WatermarkSupport.watermarkExpression(
      child.output.find(_.metadata.contains(EventTimeWatermark.delayKey)),
      eventTimeWatermark)
  }

This property is later used by the class to build 2 predicates for keys and data control:

  /** Predicate based on keys that matches data older than the watermark */
  lazy val watermarkPredicateForKeys: Option[Predicate] = watermarkExpression.flatMap { e =>
    if (keyExpressions.exists(_.metadata.contains(EventTimeWatermark.delayKey))) {
      Some(newPredicate(e, keyExpressions))
    } else {
      None
    }
  }

  /** Predicate based on the child output that matches data older than the watermark. */
  lazy val watermarkPredicateForData: Option[Predicate] =
    watermarkExpression.map(newPredicate(_, child.output))

The plan for a query without grouping doesn't include any child node using the watermark property. It's also the case for a query with aggregates on the columns not being the watermark columns. Below you can find the physical plans for 2 queries illustrating this case, where state store operators don't have a child with the watermark column in the output:

On the other hand, the query involving the watermark column aggregation looks like that:

The reason why it doesn't work for the 2 first examples is that the state store's child doesn't output the watermark column, so the engine cannot create the predicates to filter out the expired records. On the flip side, Spark is able to do so for the last query where, as you can see, the child's output contains the column defined in the watermark. Apache Spark can then create the predicates (watermarkPredicateForKeys, watermarkPredicateForData) to filter out the expired records and keys.

Having a not working code, especially if you want to demonstrate something quickly, is not a pleasant surprise. However, it helps to recall some basics like the one about reading the friendly manual ;-) It also helped me to go back to the watermarks in Structured Streaming and understand their another subtlety!

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote one on that topic! You can read it online on the O'Reilly platform, or get a print copy on Amazon.

I also help solve your data engineering problems contact@waitingforcode.com đź“©