Processing time trigger, to be or not to be?

Versions: Apache Spark 3.5.0

That's the question. The lack of the processing time trigger means more a reactive micro-batch triggering but it cannot be considered as the single true best practice. Let's see why.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote one on that topic! You can read it online on the O'Reilly platform, or get a print copy on Amazon.

I also help solve your data engineering problems 👉 contact@waitingforcode.com 📩

To understand the ins/outs, let's first remember what a processing time trigger in Apache Spark Structured Streaming is. Put differently, it defines how often a micro-batch should run. For example, a trigger of 1 minute will run a new micro-batch each minute and, ideally, it will process the last minute of data. Why "ideally"? Because if the micro-batch takes longer than 1 minute, you'll have more data.

What does it imply having a processing time-based trigger:

Pretty nice, isn't it? Indeed, but this trigger seems to be strongly tied to the micro-batch model, so to the data arriving in micro-batches. But we all know, in streaming and real-time systems it's hard to get such a guarantee. So what happens if, for such use cases, the job doesn't have the processing time trigger?

Job without processing time trigger

What happens if the job doesn't have a processing time trigger? The micro-batch starts as soon as possible after the previously completed one. The details are in the ProcessingTimeExecutor:

case class ProcessingTimeExecutor(processingTimeTrigger: ProcessingTimeTrigger, clock: Clock = new SystemClock()) extends TriggerExecutor with Logging {
// ...
  private val intervalMs = processingTimeTrigger.intervalMs
  require(intervalMs >= 0)

  override def execute(triggerHandler: () => Boolean): Unit = {
    while (true) {
      val triggerTimeMs = clock.getTimeMillis
      val nextTriggerTimeMs = nextBatchTime(triggerTimeMs)
      val terminated = !triggerHandler()
      if (intervalMs > 0) {
        val batchElapsedTimeMs = clock.getTimeMillis - triggerTimeMs
        if (batchElapsedTimeMs > intervalMs) {
          notifyBatchFallingBehind(batchElapsedTimeMs)
        }
        if (terminated) { 	 
          return
        }
        clock.waitTillTime(nextTriggerTimeMs)
      } else {
        if (terminated) {
          return  
        }
     }
  }
}

def nextBatchTime(now: Long): Long = {
  if (intervalMs == 0) now else now / intervalMs * intervalMs + intervalMs
}

The highlighted part shows the code responsible for micro-batch execution without the trigger. As you can see the query is first blocked by the triggerHandler(). In other words, nothing happens as long as the micro-batch is running. Only as soon as the micro-batch completes, the trigger goes to the else part and immediately starts the new loop iteration. It's not the case of the processing time trigger where you can see thread blocking until the next planned schedule (clock.waitTillTime(nextTriggerTimeMs)).

Comparison

To see what happens when there is no trigger associated with a Structured Streaming query. In the image below you can see four different scenarios, all processing the data generated every minute:

The best case scenario is when the data volume processed in each micro-batch is constant. It's the case for the first two configurations. They suppose a constant micro-batch execution time for simplicity but it's rarely the case! For that reason, take a look at the next two examples. They assume an execution time range between 30 seconds and 1 minute. For the job with the missing trigger defined you can see that it reduces the processing latency. It's not the case for the trigger job where the resources are not fully used.

Of course, it is only an example taking some numbers to better illustrate the impact of the trigger on the job execution. 99%, it won't reflect your production use case when the data generation flow will be more unpredictable.

For that unpredictability for both input data but also output requirements, I won't give you an answer here and say "use the trigger" or "do not use the trigger". Instead, there are some points that you might consider while deciding the configuration. I already shared the ones for the trigger, so let's see now when this trigger-less execution could be useful? There are three major impacts:

On the other hand, not having the processing time trigger will:

So be or not to be, a query with a trigger? It depends. You should always apply any recommendation to your use case.

Consulting

With nearly 16 years of experience, including 8 as data engineer, I offer expert consulting to design and optimize scalable data solutions. As an O’Reilly author, Data+AI Summit speaker, and blogger, I bring cutting-edge insights to modernize infrastructure, build robust pipelines, and drive data-driven decision-making. Let's transform your data challenges into opportunities—reach out to elevate your data engineering game today!

👉 contact@waitingforcode.com
đź”— past projects


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!