Triggers in Apache Spark Structured Streaming

Versions: Spark 2.2.1

Some last weeks I was focused on Apache Beam project. After some readings, I discovered a lot of similar concepts between Beam and Spark Structured Streaming (or inversely?). One of this similarities are triggers.

This post after some months break describes another Apache Spark feature, the triggers. The first part present triggers in the context of Apache Spark Structured Streaming project. The second one shows some implementation details. The final section contains several learning tests showing how the triggers work.

Triggers role

Spark triggers has similar role to the triggers in Apache Beam, i.e. they determine when the processing on the accumulated data is started. The execution of this processing obviously emits new data to the result table. Regarding to the previous version of streaming in Apache Spark (DStream-based), the triggers are the concept similar to the batch interval property.

In the described version (2.2.1) there are 2 different trigger types in Spark. The first type is based on the processing time. It executes the streaming query at regular interval depending on the processing time. This interval can be defined in any unit time (ms, s, min,...). The second type is called once because it executes the query only once. After the execution the query is terminated and even if new data arrives, the query is not started again.

By default Apache Spark Structured Streaming executes the queries with the processing time-based trigger of 0 ms. It means that Spark will launch new query as quick as possible after finishing to process previous query. The new execution will occur only if new data exists.

Triggers internals

Internally the triggers are grouped in org.apache.spark.sql.streaming.Trigger class where each of trigger types is represented by one or more factory methods. In the case of the processing time, we can create the trigger with: ProcessingTime(long intervalMs), ProcessingTime(long interval, TimeUnit timeUnit), ProcessingTime(Duration interval) or ProcessingTime(String interval). All of them call under-the-hood create or apply method from org.apache.spark.sql.streaming.ProcessingTime object. The once trigger is represented by the Once() returning OneTimeTrigger case object.

So created Trigger instance is used later in the streaming query as a part of org.apache.spark.sql.execution.streaming.StreamExecution attribute. Within this instance the Trigger is used to build a correct instance of org.apache.spark.sql.execution.streaming.TriggerExecutor implementation that either will be ProcessingTimeExecutor for processing time-based trigger or OneTimeExecutor for once executed trigger.

Later the streaming query is executed by TriggerExecutor's execute(triggerHandler: () => Boolean) method. The implementation of this method depends on the trigger type. For the once executed trigger, the execute method launches the triggerHandler function only once. In the case of ProcessingTimeExecutor the execute method is a long-running process (while(true) loop) where the trigger waits the interval time before executing the query.

Triggers are also related to the statistics defined inside org.apache.spark.sql.execution.streaming.ProgressReporter#finishTrigger(hasNewData: Boolean) method.

Triggers examples

Below learning tests show some of triggers specificities:

"once trigger" should "execute the query only once" in {
  val inputStream = new MemoryStream[Int](1, sparkSession.sqlContext)
  inputStream.addData(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
  val stream = inputStream.toDS().toDF("number")

  val query = stream.writeStream.outputMode("update").trigger(Trigger.Once())
    .foreach(new NoopForeachWriter[Row]())
    .start()

  // Put here a very big timeout
  // The test will end much more before this time and it proves that
  // the query is executed only once, as the trigger defines it
  val queryStartTime = System.currentTimeMillis()
  query.awaitTermination(Long.MaxValue)
  val queryEndTime = System.currentTimeMillis()

  val queryExecutionTime = queryEndTime - queryStartTime
  queryExecutionTime should be < Long.MaxValue
}

"the information about trigger" should "be available in the last progress object" in {
  val inputStream = new MemoryStream[Int](1, sparkSession.sqlContext)
  new Thread(new Runnable() {
    override def run(): Unit = {
      while (true) {
        inputStream.addData(1 to 30)
        Thread.sleep(1000)
      }
    }
  }).start()
  val stream = inputStream.toDS().toDF("number")

  val query = stream.writeStream.outputMode("update").trigger(Trigger.ProcessingTime("5 seconds"))
    .foreach(new NoopForeachWriter[Row]())
    .start()

  query.awaitTermination(10000)

  val lastProgress = query.lastProgress
  val progressJson = lastProgress.json
  // The result should be similar to:
  // {"id":"41fb220b-5fc7-456b-b104-f49d374f25d8","runId":"4aaf947f-1747-43c3-a422-0f6209c26709","name":null,
  // "timestamp":"2018-02-11T11:21:25.000Z","numInputRows":480,"inputRowsPerSecond":122.88786482334869,
  // "processedRowsPerSecond":1467.8899082568807,"durationMs":{"addBatch":127,"getBatch":135,"getOffset":0,
  // "queryPlanning":36,"triggerExecution":326,"walCommit":18},
  // "stateOperators":[],
  // "sources":[{"description":"MemoryStream[value#1]","startOffset":2,"endOffset":6,"numInputRows":480,
  // "inputRowsPerSecond":122.88786482334869,"processedRowsPerSecond":1467.8899082568807}],
  // "sink":{"description":"org.apache.spark.sql.execution.streaming.ForeachSink@5192026d"}}
  progressJson should include ("\"triggerExecution\":")
}

"processing time-based trigger" should "be executed the time defined in the query" in {
  val logAppender = InMemoryLogAppender.createLogAppender(Seq("Starting Trigger Calculation"),
    (loggingEvent: LoggingEvent) => LogMessage(s"${loggingEvent.timeStamp}", ""))
  val inputStream = new MemoryStream[Int](1, sparkSession.sqlContext)
  new Thread(new Runnable() {
    override def run(): Unit = {
      while (true) {
        inputStream.addData(1)
        Thread.sleep(5000)
      }
    }
  }).start()
  val stream = inputStream.toDS().toDF("number")

  val query = stream.writeStream.outputMode("update").trigger(Trigger.ProcessingTime("1 seconds"))
    .foreach(new NoopForeachWriter[Row]())
    .start()

  query.awaitTermination(15000)

  val triggerExecutionTimes = logAppender.getMessagesText().map(_.toLong)
  val triggersIntervals = mutable.HashMap[Long, Int]()
  for (index <- 0 until triggerExecutionTimes.size - 1) {
    val currentTimestamp = triggerExecutionTimes(index)
    val nextTimestamp = triggerExecutionTimes(index+1)

    val triggerDiff = nextTimestamp - currentTimestamp

    val currentCount = triggersIntervals.getOrElse(triggerDiff, 0)
    val newCount = currentCount + 1
    triggersIntervals.put(triggerDiff, newCount)
  }
  // Output example:
  // triggersIntervals = Map(1001 -> 1, 1434 -> 1, 1000 -> 10, 895 -> 1, 999 -> 1)
  // As you can see, sometimes the difference is +/- 1 sec because of the time took for process the data
  // It proves that the trigger was executed every ~1 second. But as you can note in the
  // next test, the trigger launch doesn't mean data processing
  triggersIntervals should contain key 1000L
}

"trigger lower than data arrival time" should "not process rows every trigger interval" in {
  val inputStream = new MemoryStream[Int](1, sparkSession.sqlContext)
  val stream = inputStream.toDS().toDF("number")

  val query = stream.writeStream.outputMode("update").trigger(Trigger.ProcessingTime("1 seconds"))
    .foreach(new ForeachWriter[Row]() {
      override def open(partitionId: Long, version: Long): Boolean = true

      override def process(value: Row): Unit = {
        val currentTime = System.currentTimeMillis()
        Container.processingTimes.append(currentTime)
      }

      override def close(errorOrNull: Throwable): Unit = {}
    })
    .start()
  new Thread(new Runnable() {
    override def run(): Unit = {
      while (!query.isActive) {}
      while (true) {
        inputStream.addData(1)
        Thread.sleep(5000)
      }
    }
  }).start()

  query.awaitTermination(30000)

  val processingTimes = Seq[Long]()
  for (index <- 0 until Container.processingTimes.size - 1) {
    val currentTimestamp = Container.processingTimes(index)
    val nextTimestamp = Instant.ofEpochMilli(Container.processingTimes(index+1))

    val processingTimeDiffInSec = nextTimestamp.minusMillis(currentTimestamp).getEpochSecond
    processingTimes :+ processingTimeDiffInSec
  }
  // Even though the trigger was defined to 1 second, we can see that the data processing is not launched
  // when there is no new data to process. So logically we should't find any difference
  // of 1 second between trigger subsequent executions for data processing
  processingTimes should not contain(1L)
}

object Container {
  var processingTimes = new ListBuffer[Long]()
}

Triggers in Apache Spark Structured Streaming help to control micro-batch processing speed. As presented in the first section, 2 different types of triggers exist: processing time-based and once (executes the query only 1 time). However, the triggers class are not a the single ones involved in the process. Their logic is executed by TriggerExecutor implementations, called in every micro-batch execution.