Watermarks and not grouped query - why they don't work

Versions: Apache Spark 2.4.4

Several weeks ago I played with watermark, just to recall some concepts. I wrote a query and...the watermark didn't work! Of course, my query was wrong but this intrigued me enough to write this short article.

TL;TR

The documentation clearly states which kind of queries are supposed to work with watermarks, so the post doesn't invent the wheel. However, I hope you will get an extra insight about the mechanism applying the watermarks for these queries.

To test the watermark, I wrote a query like this:

val inputStream = new MemoryStream[(Timestamp, String)](1, sparkSession.sqlContext)

  val aggregatedStream = inputStream.toDS().toDF("created", "name")
    .withWatermark("created", "1 minute")
    .agg(Map("name" -> "count"))
    .writeStream.format("console").option("truncate", false)
    .trigger(Trigger.ProcessingTime("20 seconds"))
    .outputMode("update")
    .start()

And I added the data to the MemoryStream. The watermark was not applied because all records, even the ones arrived after the watermark, was included in my result table. My first reflex was then "it has to be a timezone issue!". So I changed the time zones configuration everywhere:

 private val sparkSession: SparkSession = SparkSession.builder().appName("Watermark not grouped query") 
    .config("spark.sql.session.timeZone", "UTC")
    .master("local[2]").getOrCreate()
  TimeZone.setDefault(TimeZone.getTimeZone("UTC"))

But it didn't help. I went back then to the documentation and recalled to me that the watermark doesn't work for all use cases, ie. even if you use the withWatermark(...), it won't be physically executed. Among the queries supporting watermark, you will find the ones with windows or grouping expressions on the watermark columns. As you can see, my initial query wasn't even close to any of them. But that intrigued me enough to try to understand the difference on the plan when .withWatermark is defined for a query with groups and without them.

Only the query involving watermark in the grouping column worked and the records occurred before the current watermark were ignored:

  val aggregatedStream = inputStream.toDS().toDF("created", "name")
    .withWatermark("created", "1 minute") 
    .groupBy("created")
    .count()
    .writeStream.format("console").option("truncate", false)
    .trigger(Trigger.ProcessingTime("20 seconds"))
    .outputMode("update")
    .start()

Explanation

What's the reason for that behavior? The answer is hidden in WatermarkSupport class and more exactly here:

  /** Generate an expression that matches data older than the watermark */
  lazy val watermarkExpression: Option[Expression] = {
    WatermarkSupport.watermarkExpression(
      child.output.find(_.metadata.contains(EventTimeWatermark.delayKey)),
      eventTimeWatermark)
  }

This property is later used by the class to build 2 predicates for keys and data control:

  /** Predicate based on keys that matches data older than the watermark */
  lazy val watermarkPredicateForKeys: Option[Predicate] = watermarkExpression.flatMap { e =>
    if (keyExpressions.exists(_.metadata.contains(EventTimeWatermark.delayKey))) {
      Some(newPredicate(e, keyExpressions))
    } else {
      None
    }
  }

  /** Predicate based on the child output that matches data older than the watermark. */
  lazy val watermarkPredicateForData: Option[Predicate] =
    watermarkExpression.map(newPredicate(_, child.output))

The plan for a query without grouping doesn't include any child node using the watermark property. It's also the case for a query with aggregates on the columns not being the watermark columns. Below you can find the physical plans for 2 queries illustrating this case, where state store operators don't have a child with the watermark column in the output:

On the other hand, the query involving the watermark column aggregation looks like that:

The reason why it doesn't work for the 2 first examples is that the state store's child doesn't output the watermark column, so the engine cannot create the predicates to filter out the expired records. On the flip side, Spark is able to do so for the last query where, as you can see, the child's output contains the column defined in the watermark. Apache Spark can then create the predicates (watermarkPredicateForKeys, watermarkPredicateForData) to filter out the expired records and keys.

Having a not working code, especially if you want to demonstrate something quickly, is not a pleasant surprise. However, it helps to recall some basics like the one about reading the friendly manual ;-) It also helped me to go back to the watermarks in Structured Streaming and understand their another subtlety!

If you liked it, you should read: