Processing time trigger, to be or not to be?

That's the question. The lack of the processing time trigger means more a reactive micro-batch triggering but it cannot be considered as the single true best practice. Let's see why.

4-day workshop · In-person or online

What would it take for you to trust your Databricks pipelines in production?

A 3-day bug hunt on a 3-person team costs up to €7,200 in lost engineering time. This workshop teaches you to prevent that — unit tests, data tests, and integration tests for PySpark and Databricks Lakeflow, including Spark Declarative Pipelines.

Unit, data & integration tests
Medallion architecture & Lakeflow SDP
Max 10 participants · production-ready templates
See the full curriculum → €7,000 flat fee · cohort of up to 10
Bartosz Konieczny
Bartosz
Konieczny

To understand the ins/outs, let's first remember what a processing time trigger in Apache Spark Structured Streaming is. Put differently, it defines how often a micro-batch should run. For example, a trigger of 1 minute will run a new micro-batch each minute and, ideally, it will process the last minute of data. Why "ideally"? Because if the micro-batch takes longer than 1 minute, you'll have more data.

What does it imply having a processing time-based trigger:

Pretty nice, isn't it? Indeed, but this trigger seems to be strongly tied to the micro-batch model, so to the data arriving in micro-batches. But we all know, in streaming and real-time systems it's hard to get such a guarantee. So what happens if, for such use cases, the job doesn't have the processing time trigger?

Job without processing time trigger

What happens if the job doesn't have a processing time trigger? The micro-batch starts as soon as possible after the previously completed one. The details are in the ProcessingTimeExecutor:

case class ProcessingTimeExecutor(processingTimeTrigger: ProcessingTimeTrigger, clock: Clock = new SystemClock()) extends TriggerExecutor with Logging {
// ...
  private val intervalMs = processingTimeTrigger.intervalMs
  require(intervalMs >= 0)

  override def execute(triggerHandler: () => Boolean): Unit = {
    while (true) {
      val triggerTimeMs = clock.getTimeMillis
      val nextTriggerTimeMs = nextBatchTime(triggerTimeMs)
      val terminated = !triggerHandler()
      if (intervalMs > 0) {
        val batchElapsedTimeMs = clock.getTimeMillis - triggerTimeMs
        if (batchElapsedTimeMs > intervalMs) {
          notifyBatchFallingBehind(batchElapsedTimeMs)
        }
        if (terminated) { 	 
          return
        }
        clock.waitTillTime(nextTriggerTimeMs)
      } else {
        if (terminated) {
          return  
        }
     }
  }
}

def nextBatchTime(now: Long): Long = {
  if (intervalMs == 0) now else now / intervalMs * intervalMs + intervalMs
}

The highlighted part shows the code responsible for micro-batch execution without the trigger. As you can see the query is first blocked by the triggerHandler(). In other words, nothing happens as long as the micro-batch is running. Only as soon as the micro-batch completes, the trigger goes to the else part and immediately starts the new loop iteration. It's not the case of the processing time trigger where you can see thread blocking until the next planned schedule (clock.waitTillTime(nextTriggerTimeMs)).

Comparison

To see what happens when there is no trigger associated with a Structured Streaming query. In the image below you can see four different scenarios, all processing the data generated every minute:

The best case scenario is when the data volume processed in each micro-batch is constant. It's the case for the first two configurations. They suppose a constant micro-batch execution time for simplicity but it's rarely the case! For that reason, take a look at the next two examples. They assume an execution time range between 30 seconds and 1 minute. For the job with the missing trigger defined you can see that it reduces the processing latency. It's not the case for the trigger job where the resources are not fully used.

Of course, it is only an example taking some numbers to better illustrate the impact of the trigger on the job execution. 99%, it won't reflect your production use case when the data generation flow will be more unpredictable.

For that unpredictability for both input data but also output requirements, I won't give you an answer here and say "use the trigger" or "do not use the trigger". Instead, there are some points that you might consider while deciding the configuration. I already shared the ones for the trigger, so let's see now when this trigger-less execution could be useful? There are three major impacts:

On the other hand, not having the processing time trigger will:

So be or not to be, a query with a trigger? It depends. You should always apply any recommendation to your use case.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote one on that topic! You can read it online on the O'Reilly platform, or get a print copy on Amazon.

I also help solve your data engineering problems contact@waitingforcode.com đź“©