Apache Spark 2.4.0 features - watermark configuration

Versions: Apache Spark 2.4.0

The series about Apache Spark 2.4.0 features continues. After last week's discovery of bucket pruning, it's time to switch to Structured Streaming module and see its major evolution.

This post presents the feature of watermark configuration added in 2.4.0 release. Its first part explains what it changed. The second one gives the implementation details showing one of the strategies used to guarantee backward compatibility. The last section contains the test case illustrating the watermark configuration feature.

Min or max watermark ?

The release 2.3.0 came with the possibility to join the streams. As explained in inner joins between streams in Apache Spark Structured Streaming, doing that it's not an easy task because the joined streams can have different latency. One of the methods to control which data can be joined, even if it comes in late, is the watermark. Prior to 2.4.0 release, when multiple watermarks were defined, the smallest value was used by default. It allowed us to move with the slowest stream and to not drop too much data:

With the change introduced in Apache Spark 2.4.0 we can configure the used watermark in such situations and take either min or max watermark. Of course, choosing the max value would lead to the case where the application will move forward with the fastest stream and hence drop much more data:

Watermark configuration implementation

The watermark management was implemented with spark.sql.streaming.multipleWatermarkPolicy configuration entry. It takes one of 'min' or 'max' values. Each of them leads to the initialization of the corresponding MultipleWatermarkPolicy instance: MinWatermark and MaxWatermark. Both come with a method resolving the used watermark value. This method is nothing more than the call to Seq's min or max method, according to the used policy. The class responsible for calling the configured policy is WatermarkTracker.

The backward compatibility is guaranteed by the default value of the multipleWatermarkPolicy entry which is 'min'. Unless you change this value explicitly, you should be able to run the old streaming pipelines on the new Spark's release without problems. And those even for the pipelines recovered from the checkpoints prior to Apache Spark 2.4.0. And because we're talking about the checkpoints, one important thing is to remember. We can't modify the watermark policy after restoring the pipeline from the checkpoint. Any change will be ignored.

Max watermark example

To see this new feature in action we'll execute 2 test cases. The former one will have the min watermark and hence accept more late data:

object FirstWatermark {
  var FirstKnownValue = ""
}
  def launchDataInjection(mainEventsStream: MemoryStream[MainEvent],
                          joinedEventsStream: MemoryStream[JoinedEvent], query: StreamingQuery): Unit = {
    new Thread(new Runnable() {
      override def run(): Unit = {
        val stateManagementHelper = new StateManagementHelper(mainEventsStream, joinedEventsStream)
        var key = 0
        val processingTimeFrom1970 = 10000L // 10 sec
        stateManagementHelper.waitForWatermarkToChange(query, processingTimeFrom1970)
        FirstWatermark.FirstKnownValue = query.lastProgress.eventTime.get("watermark")
        key = 2
        // We send keys: 2, 3, 4, 5, 6  in late to see watermark applied
        var startingTime = stateManagementHelper.getCurrentWatermarkMillisInUtc(query) - 5000L
        while (query.isActive) {
          if (key % 2 == 0) {
            stateManagementHelper.sendPairedKeysWithSleep(s"key${key}", startingTime)
          } else {
            mainEventsStream.addData(MainEvent(s"key${key}", startingTime, new Timestamp(startingTime)))
          }
          startingTime += 1000L
          key += 1
        }
      }
    }).start()
  }

  "min watermark" should "be used when the min policy is configured" in {
    val sparkSession: SparkSession = SparkSession.builder()
      .appName("Spark Structured Streaming min").config("spark.sql.streaming.multipleWatermarkPolicy", "min")
      .master("local[3]").getOrCreate() // 3 executors are required to good execution of this test, at least 4 cores should be available
    import sparkSession.implicits._

    val mainEventsStream = new MemoryStream[MainEvent](1, sparkSession.sqlContext)
    val joinedEventsStream = new MemoryStream[JoinedEvent](2, sparkSession.sqlContext)

    val mainEventsDataset = mainEventsStream.toDS().select($"mainKey", $"mainEventTime", $"mainEventTimeWatermark")
      .withWatermark("mainEventTimeWatermark", "4 seconds")
    val joinedEventsDataset = joinedEventsStream.toDS().select($"joinedKey", $"joinedEventTime", $"joinedEventTimeWatermark")
      .withWatermark("joinedEventTimeWatermark", "8 seconds")

    val stream = mainEventsDataset.join(joinedEventsDataset, mainEventsDataset("mainKey") === joinedEventsDataset("joinedKey") &&
      mainEventsDataset("mainEventTimeWatermark") >= joinedEventsDataset("joinedEventTimeWatermark"),
      "leftOuter")

    val query = stream.writeStream.trigger(Trigger.ProcessingTime(3000L)).foreach(RowProcessor).start()

    while (!query.isActive) {}
    launchDataInjection(mainEventsStream, joinedEventsStream, query)
    query.awaitTermination(150000)

    // Do not assert on the watermark directly - it can change depending on the execution environment
    // We simply check the first known watermark value after the change
    FirstWatermark.FirstKnownValue shouldEqual "1970-01-01T00:00:02.000Z"
  }

The use of min watermark involves less aggressive rejection strategy. Here only the messages older than 1970-01-01T00:00:02.000Z will be ignored. Which is not the case of the following test case using max strategy:

  "max watermark" should "be used when the max policy is configured" in {
    val sparkSession: SparkSession = SparkSession.builder()
      .appName("Spark Structured Streaming min").config("spark.sql.streaming.multipleWatermarkPolicy", "max")
      .master("local[3]").getOrCreate() // 3 executors are required to good execution of this test, at least 4 cores should be available
    import sparkSession.implicits._

    val mainEventsStream = new MemoryStream[MainEvent](1, sparkSession.sqlContext)
    val joinedEventsStream = new MemoryStream[JoinedEvent](2, sparkSession.sqlContext)

    // Max = 06, hence it takes 4 seconds
    // Min = 02, hence it takes 8 seconds
    val mainEventsDataset = mainEventsStream.toDS().select($"mainKey", $"mainEventTime", $"mainEventTimeWatermark")
      .withWatermark("mainEventTimeWatermark", "4 seconds")
    val joinedEventsDataset = joinedEventsStream.toDS().select($"joinedKey", $"joinedEventTime", $"joinedEventTimeWatermark")
      .withWatermark("joinedEventTimeWatermark", "8 seconds")

    val stream = mainEventsDataset.join(joinedEventsDataset, mainEventsDataset("mainKey") === joinedEventsDataset("joinedKey") &&
      mainEventsDataset("mainEventTimeWatermark") >= joinedEventsDataset("joinedEventTimeWatermark"),
      "leftOuter")

    val query = stream.writeStream.trigger(Trigger.ProcessingTime(3000L)).foreach(RowProcessor).start()

    while (!query.isActive) {}
    launchDataInjection(mainEventsStream, joinedEventsStream, query)
    query.awaitTermination(150000)

    FirstWatermark.FirstKnownValue shouldEqual "1970-01-01T00:00:06.000Z"
  }

As you can see here, any message older than 1970-01-01T00:00:06.000Z will be discarded. This value corresponds to the first watermark in the processing, resolved from the max watermark policy.

Watermark configuration adds more flexibility to streaming pipelines based on Apache Spark. Prior to the 2.4.0 release, the engine always chosen the minimal watermark among the joined streams. Now it's up to the user to define the joining strategy.