Watermark in Structured Streaming

Versions: Apache Spark 2.4.2

I was already taking about watermark on my blog but this time I will focus more on its use in the context of a stateful processing.

Outline:

What is its purpose?

The goal of watermark in Structured Streaming is twofold. First, it defines how old the data ingested into the pipeline can be. If one record is older than the watermark (aggregated queries, like window-based), it will be simply dropped from the processing.

The second purpose concerns stateful processing. Some of stateful streaming operations involve growing state. One of them is duplicates removal. The physical operator responsible for its execution, StreamingDeduplicateExec, has an auto-cleaning feature removing the entries from state store older than the current watermark:

  protected def removeKeysOlderThanWatermark(store: StateStore): Unit = {
    if (watermarkPredicateForKeys.nonEmpty) {
      store.getRange(None, None).foreach { rowPair =>
        if (watermarkPredicateForKeys.get.eval(rowPair.key)) {
          store.remove(rowPair.key)
        }
      }
    }
  }

What is the relationship with mapGroupsWithState?

For mapGroupsWithState, watermark will be used as a marker for expired state. So, it can be thought as an event-time based synonym for processing time. What does it mean? As you know, you can configure your state expiration by processing time which is the time of given micro-batch execution (~ now()). This timeout configuration is therefore related to the real time.

The "real time" base for event time-based timeout comes from watermark. It means that the state expiration will be driven by the watermark changes. But the watermark itself will change only from the input (raw) events used to compute the state.

When the watermark is updated?

The watermark is updated at the end of the query, exactly at the same moment when processed logs are committed to the metadata checkpoint. The new watermark is then generated for the next query execution:

// MicroBatchExecution
  private def runBatch(sparkSessionToRunBatch: SparkSession): Unit = {
    logDebug(s"Running batch $currentBatchId")
// ...
    withProgressLocked {
      watermarkTracker.updateWatermark(lastExecution.executedPlan)
      commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))
      committedOffsets ++= availableOffsets
    }
    logDebug(s"Completed batch ${currentBatchId}")

How to use it?

To use watermark, call Dataset#withWatermark(eventTime: String, delayThreshold: String) method. The watermark for current execution will be computed as MAX(eventTime) - delayThreshold. Later, Apache Spark will collect all max values for event time column and, according to the watermark policy, either choose the min or max value among them. If the chosen value is greater than the current watermark of the query, it will become the new watermark.

The watermark policy can be min or max and is configured in spark.sql.streaming.multipleWatermarkPolicy property.

What are the implementation details?

In the previous part you saw how to define the watermark for the query. But I didn't detailed there what happens under-the-hood. When a watermark column is present in the executed query, Apache Spark will use EventTimeWatermarkExec physical operator to collect all event time values in the currently executed query. And to collect them, it will use EventTimeStatsAccum accumulator. At every new value, the accumulator will take the min, max and average values that way:

case class EventTimeStats(var max: Long, var min: Long, var avg: Double, var count: Long) {
  def add(eventTime: Long): Unit = {
    this.max = math.max(this.max, eventTime)
    this.min = math.min(this.min, eventTime)
    this.count += 1
    this.avg += (eventTime - avg) / count
  }

So that the memory footprint of the accumulation step will be insignificant. By the way, the same class and method is used to report streaming query execution metrics.

What happens if the query has multiple watermarks?

I answered that question partially, it all depends on the spark.sql.streaming.multipleWatermarkPolicy property. More exactly, the magic happens in WatermarkTracker#updateWatermark(executedPlan: SparkPlan) method.

At the beginning, Apache Spark collects all physical nodes representing watermark execution:

    val watermarkOperators = executedPlan.collect {
      case e: EventTimeWatermarkExec => e
    }

Next, for every operator, it updates its watermark value only if the new value is greater than the previous one:

  private val operatorToWatermarkMap = mutable.HashMap[Int, Long]()
  private var globalWatermarkMs: Long = 0

    watermarkOperators.zipWithIndex.foreach {
      case (e, index) if e.eventTimeStats.value.count > 0 =>
        logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}")
        val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
        val prevWatermarkMs = operatorToWatermarkMap.get(index)
        if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) {
          operatorToWatermarkMap.put(index, newWatermarkMs)
        }

      // Populate 0 if we haven't seen any data yet for this watermark node.
      case (_, index) =>
        if (!operatorToWatermarkMap.isDefinedAt(index)) {
          operatorToWatermarkMap.put(index, 0)
        }
    }

At the end, depending on the multiple watermark policy (min is the default), the global watermark will be eventually updated:

    val chosenGlobalWatermark = policy.chooseGlobalWatermark(operatorToWatermarkMap.values.toSeq)
    if (chosenGlobalWatermark > globalWatermarkMs) {
      logInfo(s"Updating event-time watermark from $globalWatermarkMs to $chosenGlobalWatermark ms")
      globalWatermarkMs = chosenGlobalWatermark
    } else {
      logDebug(s"Event time watermark didn't move: $chosenGlobalWatermark < $globalWatermarkMs")
    }

Does watermark exist only in micro-batch?

Yes, if you try to use watermark in continuous trigger, you will get this error:

Continuous processing does not support EventTimeWatermark operations.;;
EventTimeWatermark timestamp#0: timestamp, interval 2 minutes
+- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@590adb41, rate, Map(numPartitions -> 2, rowsPerSecond -> 10), [timestamp#0, value#1L]

That failure comes from UnsupportedOperationChecker#checkForContinuous(plan: LogicalPlan, outputMode: OutputMode) where analyzer checks whether the logical plan contains only operations allowed in continuous mode:

    plan.foreachUp { implicit subPlan =>
      subPlan match {
        case (_: Project | _: Filter | _: MapElements | _: MapPartitions |
              _: DeserializeToObject | _: SerializeFromObject | _: SubqueryAlias |
              _: TypedFilter) =>
        case node if node.nodeName == "StreamingRelationV2" =>
        case Repartition(1, false, _) =>
        case node: Aggregate =>
          val aboveSinglePartitionCoalesce = node.find {
            case Repartition(1, false, _) => true
            case _ => false
          }.isDefined

          if (!aboveSinglePartitionCoalesce) {
            throwError(s"In continuous processing mode, coalesce(1) must be called before " +
              s"aggregate operation ${node.nodeName}.")
          }
        case node =>
          throwError(s"Continuous processing does not support ${node.nodeName} operations.")
      }

The post answers to some questions about watermark in Structured Streaming. You can learn here about its lifecycle, some of the implementation details and role in the query. I will cover some of the watermark points in another post where I will discuss data reprocessing.


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!