Watermark and window-based processing

Versions: Apache Spark 3.0.0

One of the not obvious things about the watermark is how it applies on the windows. At first glance, you could think that it will filter out the records produced before the watermark value. But it's not how it works for windows.

Looking for a better data engineering position and skills?

You have been working as a data engineer but feel stuck? You don't have any new challenges and are still writing the same jobs all over again? You have now different options. You can try to look for a new job, now or later, or learn from the others! "Become a Better Data Engineer" initiative is one of these places where you can find online learning resources where the theory meets the practice. They will help you prepare maybe for the next job, or at least, improve your current skillset without looking for something else.

👉 I'm interested in improving my data engineering skillset

See you there, Bartosz

In this blog post, you will learn how Apache Spark Structured Streaming watermark works on the window-based processing. The first part of the blog post will show you why thinking about the watermark in the window processing context can be misleading. Hopefully, you should understand better what happens after reading the second section.

Windows example

Let's start with a demo. In the code you can see one input DataFrame that will be an Apache Kafka topic. The topic has a very simple schema composed of one field that will be used as the watermark (event_time) and one field that will be used in the sum aggregation (count). The processing part generates a sum of count column in the 10-minutes window and generate the updates continuously to the console writer:

  val window = inputKafkaRecords.selectExpr("CAST(value AS STRING)")
    .select(functions.from_json($"value", inputKafkaRecordSchema).as("record"))
    .withWatermark("event_time", "25 minutes")
    .groupBy(functions.window($"event_time", "10 minutes"))

  val writeQuery = window.sum("count")
    .option("checkpointLocation", s"/tmp/wfc/test-watermark${System.currentTimeMillis()}")
    .option("truncate", false)

Apparently there is nothing strange but if you start to play with the data, you will see that the things may become a little bit weird if you don't know the watermark semantic for the window expressions. A short demo in the below video:

Windows and watermark

To understand what happens and why, let's start by defining the windows-based processing in Apache Spark Structured Streaming. And to do so, nothing better than taking a look at the plan generated for our demo code:

The key for the window-based processing is this part:

As you can see here, Apache Spark creates a plan with the 2 columns from the schema but also with an extra column of struct type composed of start and end fields. And this extra field is later used to perform the windowed operation which is nothing else than the groupByKey-like aggregation made on the window column!

Later on, Apache Spark shuffles the data and generates the sum of count columns. During this last global step, it reloads the previous state for the keys to generate the updated sum, and also outputs the records to the console by filtering them out. The filtering predicate is based on the window structure, the one with start and end fields. More exactly, it takes the end time of all the windows retrieved in the micro-batch and compares it against the current watermark. You can see it pretty easily in this snippet corresponding to the executed predicate:

[2020-10-03 11:37:12,596] org.apache.spark.internal.Logging DEBUG Generated predicate '(input[0, struct, false].end <= 1588633260000000)':
/* 001 */ public SpecificPredicate generate(Object[] references) {
/* 002 */   return new SpecificPredicate(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificPredicate extends org.apache.spark.sql.catalyst.expressions.BasePredicate {
/* 006 */   private final Object[] references;
/* 007 */
/* 008 */
/* 009 */   public SpecificPredicate(Object[] references) {
/* 010 */     this.references = references;
/* 011 */
/* 012 */   }
/* 013 */
/* 014 */   public void initialize(int partitionIndex) {
/* 015 */
/* 016 */   }
/* 017 */
/* 018 */   public boolean eval(InternalRow i) {
/* 019 */     boolean isNull_0 = true;
/* 020 */     boolean value_0 = false;

// BK: It's the window column
/* 021 */     InternalRow value_2 = i.getStruct(0, 2);
/* 022 */     boolean isNull_1 = false;
/* 023 */     long value_1 = -1L;
/* 024 */
/* 025 */
/* 026 */     if (value_2.isNullAt(1)) {
/* 027 */       isNull_1 = true;
/* 028 */     } else {

// BK: It's the window end range in milliseconds
/* 029 */       value_1 = value_2.getLong(1);
/* 030 */     }
/* 031 */     if (!isNull_1) {
/* 032 */
/* 033 */
/* 034 */       isNull_0 = false; // resultCode could change nullability.
/* 035 */       value_0 = value_1 <= 1588633260000000L;
/* 036 */
/* 037 */     }
/* 038 */     return !isNull_0 && value_0;
/* 039 */   }
/* 040 */
/* 041 */
/* 042 */ }

And where this predicate is applied? In StateStoreSaveExec under the corresponding output mode:

          case Some(Update) =>

            new NextIterator[InternalRow] {
              // Filter late date using watermark if specified
              private[this] val baseIterator = watermarkPredicateForData match {
                case Some(predicate) => iter.filter((row: InternalRow) => !predicate.eval(row))
                case None => iter
              private val updatesStartTimeNs = System.nanoTime

              override protected def getNext(): InternalRow = {
                if (baseIterator.hasNext) {
                  val row = baseIterator.next().asInstanceOf[UnsafeRow]
                  stateManager.put(store, row)
                  numOutputRows += 1
                  numUpdatedStateRows += 1
                } else {
                  finished = true

To answer two questions from the demo video. Apache Spark created a new window even for the record being behind the watermark simply because the watermark is evaluated against the window end range and not the record itself! For the same reason Apache Spark kept the window after moving the watermark - here too, the TTL of the state is based on the end interval of the window.

The window-based processing is maybe one of the basic stateful transformations in the streaming applications. After this article, you should understand better how it works alongside the watermark because yes, sometimes it can be misleading, at least at the beginning, when you may think that the watermark applies at the message level.

If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!