Processing time trigger, to be or not to be?

Versions: Apache Spark 3.5.0

That's the question. The lack of the processing time trigger means more a reactive micro-batch triggering but it cannot be considered as the single true best practice. Let's see why.

To understand the ins/outs, let's first remember what a processing time trigger in Apache Spark Structured Streaming is. Put differently, it defines how often a micro-batch should run. For example, a trigger of 1 minute will run a new micro-batch each minute and, ideally, it will process the last minute of data. Why "ideally"? Because if the micro-batch takes longer than 1 minute, you'll have more data.

What does it imply having a processing time-based trigger:

Pretty nice, isn't it? Indeed, but this trigger seems to be strongly tied to the micro-batch model, so to the data arriving in micro-batches. But we all know, in streaming and real-time systems it's hard to get such a guarantee. So what happens if, for such use cases, the job doesn't have the processing time trigger?

Job without processing time trigger

What happens if the job doesn't have a processing time trigger? The micro-batch starts as soon as possible after the previously completed one. The details are in the ProcessingTimeExecutor:

case class ProcessingTimeExecutor(processingTimeTrigger: ProcessingTimeTrigger, clock: Clock = new SystemClock()) extends TriggerExecutor with Logging {
// ...
  private val intervalMs = processingTimeTrigger.intervalMs
  require(intervalMs >= 0)

  override def execute(triggerHandler: () => Boolean): Unit = {
    while (true) {
      val triggerTimeMs = clock.getTimeMillis
      val nextTriggerTimeMs = nextBatchTime(triggerTimeMs)
      val terminated = !triggerHandler()
      if (intervalMs > 0) {
        val batchElapsedTimeMs = clock.getTimeMillis - triggerTimeMs
        if (batchElapsedTimeMs > intervalMs) {
          notifyBatchFallingBehind(batchElapsedTimeMs)
        }
        if (terminated) { 	 
          return
        }
        clock.waitTillTime(nextTriggerTimeMs)
      } else {
        if (terminated) {
          return  
        }
     }
  }
}

def nextBatchTime(now: Long): Long = {
  if (intervalMs == 0) now else now / intervalMs * intervalMs + intervalMs
}

The highlighted part shows the code responsible for micro-batch execution without the trigger. As you can see the query is first blocked by the triggerHandler(). In other words, nothing happens as long as the micro-batch is running. Only as soon as the micro-batch completes, the trigger goes to the else part and immediately starts the new loop iteration. It's not the case of the processing time trigger where you can see thread blocking until the next planned schedule (clock.waitTillTime(nextTriggerTimeMs)).

Comparison

To see what happens when there is no trigger associated with a Structured Streaming query. In the image below you can see four different scenarios, all processing the data generated every minute:

The best case scenario is when the data volume processed in each micro-batch is constant. It's the case for the first two configurations. They suppose a constant micro-batch execution time for simplicity but it's rarely the case! For that reason, take a look at the next two examples. They assume an execution time range between 30 seconds and 1 minute. For the job with the missing trigger defined you can see that it reduces the processing latency. It's not the case for the trigger job where the resources are not fully used.

Of course, it is only an example taking some numbers to better illustrate the impact of the trigger on the job execution. 99%, it won't reflect your production use case when the data generation flow will be more unpredictable.

For that unpredictability for both input data but also output requirements, I won't give you an answer here and say "use the trigger" or "do not use the trigger". Instead, there are some points that you might consider while deciding the configuration. I already shared the ones for the trigger, so let's see now when this trigger-less execution could be useful? There are three major impacts:

On the other hand, not having the processing time trigger will:

So be or not to be, a query with a trigger? It depends. You should always apply any recommendation to your use case.


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!