What's new in Apache Spark 3.5.0 - Structured Streaming

Versions: Apache Spark 3.5.0

It's time to start the series covering Apache Spark 3.5.0 features. As the first topic I'm going to cover Structured Streaming which has got a lot of RocksDB improvements and some major API changes.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I'm currently writing one on that topic and the first chapters are already available in 👉 Early Release on the O'Reilly platform

I also help solve your data engineering problems 👉 contact@waitingforcode.com 📩

dropDuplicatesWithinWatermark

That's the major API change I mentioned. Structured Streaming has a new method to deal with the duplicates. The previous one is a bit tricky because it cleans the state store only if the watermark column is a part of the deduplication expression. The new method called dropDuplicatesWithinWatermark was added by Jungtaek Lim. It addresses that issue and should be a great addition to the users who can't guarantee the same value for the warmerked column in the duplicated rows, e.g. with the method an event 1 can be perfectly detected as duplicate even though the duplicate creation occurs 10 seconds later.

Let's take an example. The snippet below generates 3 records to an in-memory stream used often for test purposes and calls the new method:

val memoryStream1 = MemoryStream[Event]

val query = memoryStream1.toDS
  .withWatermark("eventTime", "20 seconds")
  .dropDuplicatesWithinWatermark("id")

val writeQuery = query.writeStream.format("console")
  .option("truncate", false).start()

memoryStream1.addData(
  Seq(
    Event(1, Timestamp.valueOf("2023-06-10 10:20:40")),
    Event(1, Timestamp.valueOf("2023-06-10 10:20:30")),
    Event(2, Timestamp.valueOf("2023-06-10 10:20:50")),
    Event(3, Timestamp.valueOf("2023-06-10 10:20:45")),
  )
)

After running the first micro-batch, the output contains:

-------------------------------------------
Batch: 0
-------------------------------------------
+---+-------------------+
|id |eventTime      	|
+---+-------------------+
|1  |2023-06-10 10:20:40|
|3  |2023-06-10 10:20:45|
|2  |2023-06-10 10:20:50|
+---+-------------------+

It detected the event id 1 as a duplicated record because the new semantic relies on the column-based deduplication, without requiring to set the watermark column as a part of columns to clean the state store up.

In the next micro-batch I'm adding yet another duplicate event id:

memoryStream1.addData(
  Seq(
    Event(1, Timestamp.valueOf("2023-06-10 10:22:40")),
    Event(1, Timestamp.valueOf("2023-06-10 10:20:10")),
    Event(4, Timestamp.valueOf("2023-06-10 10:21:50")),
    Event(5, Timestamp.valueOf("2023-06-10 10:21:45")),
  )
)

This time too, the new function correctly detected the event 1 as a duplicate:

-------------------------------------------
Batch: 2
-------------------------------------------
+---+-------------------+
|id |eventTime      	|
+---+-------------------+
|5  |2023-06-10 10:21:45|
|4  |2023-06-10 10:21:50|
+---+-------------------+

However, at that moment the watermark moved on to 2023-06-10T10:22:20.000Z. As you can see, the duplicates do contribute to the watermark settings.

Let's finish this demo with the last records processed:

memoryStream1.addData(
  Seq(
    Event(1, Timestamp.valueOf("2023-06-10 10:24:40")),
  )
)

This time, the event id 1 is not detected as duplicate:

-------------------------------------------
Batch: 4
-------------------------------------------
+---+-------------------+
|id |eventTime      	|
+---+-------------------+
|1  |2023-06-10 10:24:40|
+---+-------------------+

And that's by design! The watermark moved on, hence the state store manager cleaned all expired entries up. The micro-batch then got a record that was out of the deduplication window limited by the watermark duration.

The outcome for this last step would be the same for the previous dropDuplicates method. But the first micro-batches would generate a different outcome because of the watermark column present in the deduplication columns list (.dropDuplicates("eventTime", "id")):

-------------------------------------------
Batch: 0
-------------------------------------------
+---+-------------------+
|id |eventTime      	|
+---+-------------------+
|2  |2023-06-10 10:20:50|
|3  |2023-06-10 10:20:45|
|1  |2023-06-10 10:20:30|
|1  |2023-06-10 10:20:40|
+---+-------------------+

-------------------------------------------
Batch: 2
-------------------------------------------
+---+-------------------+
|id |eventTime      	|
+---+-------------------+
|4  |2023-06-10 10:21:50|
|5  |2023-06-10 10:21:45|
|1  |2023-06-10 10:22:40|
+---+-------------------+

-------------------------------------------
Batch: 4
-------------------------------------------
+---+-------------------+
|id |eventTime      	|
+---+-------------------+
|1  |2023-06-10 10:24:40|
+---+-------------------+

RocksDB state store

Besides these 2 user-facing features, there are some huge improvements for the RocksDB state store, such as:

onQueryIdle

Finally, Jungtaek Lim extended the StreamingQueryListener with a new method. The onQueryIdle callback applies to an idle query that is waiting for the new data to process. The example below:

sparkSession.streams.addListener(new StreamingQueryListener() {
  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}

  override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {}

  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}

  override def onQueryIdle(event: QueryIdleEvent): Unit = {
	println("Query is idle:")
	println(event.json)
  }
})

import sparkSession.implicits._
implicit val sparkContext: SQLContext = sparkSession.sqlContext
val memoryStream1 = MemoryStream[Int]

val query = memoryStream1.toDS

val writeQuery = query.writeStream
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .format("console")
  .option("truncate", false).start()

memoryStream1.addData(Seq(1, 2, 3))
writeQuery.awaitTermination()

When you let this code running for a few seconds, you should see the prints showing up:

Query is idle:
{"id":"a6d930ae-cefa-4d9b-b0f6-67359be6c912","runId":"da81f2e0-8fbc-4fa9-8e08-b8b1d3f0370a","timestamp":"2023-10-02T04:34:16.000Z"}
Query is idle:
{"id":"a6d930ae-cefa-4d9b-b0f6-67359be6c912","runId":"da81f2e0-8fbc-4fa9-8e08-b8b1d3f0370a","timestamp":"2023-10-02T04:34:26.001Z"}
Query is idle:
{"id":"a6d930ae-cefa-4d9b-b0f6-67359be6c912","runId":"da81f2e0-8fbc-4fa9-8e08-b8b1d3f0370a","timestamp":"2023-10-02T04:34:36.000Z"}
Query is idle:
{"id":"a6d930ae-cefa-4d9b-b0f6-67359be6c912","runId":"da81f2e0-8fbc-4fa9-8e08-b8b1d3f0370a","timestamp":"2023-10-02T04:34:46.001Z"}

In the blog post I omitted one feature on purpose, the watermark propagation which will be the topic of the next article!