It's time to start the series covering Apache Spark 3.5.0 features. As the first topic I'm going to cover Structured Streaming which has got a lot of RocksDB improvements and some major API changes.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in 👉
Early Release on the O'Reilly platform
I also help solve your data engineering problems 👉 contact@waitingforcode.com 📩
dropDuplicatesWithinWatermark
That's the major API change I mentioned. Structured Streaming has a new method to deal with the duplicates. The previous one is a bit tricky because it cleans the state store only if the watermark column is a part of the deduplication expression. The new method called dropDuplicatesWithinWatermark was added by Jungtaek Lim. It addresses that issue and should be a great addition to the users who can't guarantee the same value for the warmerked column in the duplicated rows, e.g. with the method an event 1 can be perfectly detected as duplicate even though the duplicate creation occurs 10 seconds later.
Let's take an example. The snippet below generates 3 records to an in-memory stream used often for test purposes and calls the new method:
val memoryStream1 = MemoryStream[Event] val query = memoryStream1.toDS .withWatermark("eventTime", "20 seconds") .dropDuplicatesWithinWatermark("id") val writeQuery = query.writeStream.format("console") .option("truncate", false).start() memoryStream1.addData( Seq( Event(1, Timestamp.valueOf("2023-06-10 10:20:40")), Event(1, Timestamp.valueOf("2023-06-10 10:20:30")), Event(2, Timestamp.valueOf("2023-06-10 10:20:50")), Event(3, Timestamp.valueOf("2023-06-10 10:20:45")), ) )
After running the first micro-batch, the output contains:
------------------------------------------- Batch: 0 ------------------------------------------- +---+-------------------+ |id |eventTime | +---+-------------------+ |1 |2023-06-10 10:20:40| |3 |2023-06-10 10:20:45| |2 |2023-06-10 10:20:50| +---+-------------------+
It detected the event id 1 as a duplicated record because the new semantic relies on the column-based deduplication, without requiring to set the watermark column as a part of columns to clean the state store up.
In the next micro-batch I'm adding yet another duplicate event id:
memoryStream1.addData( Seq( Event(1, Timestamp.valueOf("2023-06-10 10:22:40")), Event(1, Timestamp.valueOf("2023-06-10 10:20:10")), Event(4, Timestamp.valueOf("2023-06-10 10:21:50")), Event(5, Timestamp.valueOf("2023-06-10 10:21:45")), ) )
This time too, the new function correctly detected the event 1 as a duplicate:
------------------------------------------- Batch: 2 ------------------------------------------- +---+-------------------+ |id |eventTime | +---+-------------------+ |5 |2023-06-10 10:21:45| |4 |2023-06-10 10:21:50| +---+-------------------+
However, at that moment the watermark moved on to 2023-06-10T10:22:20.000Z. As you can see, the duplicates do contribute to the watermark settings.
Let's finish this demo with the last records processed:
memoryStream1.addData( Seq( Event(1, Timestamp.valueOf("2023-06-10 10:24:40")), ) )
This time, the event id 1 is not detected as duplicate:
------------------------------------------- Batch: 4 ------------------------------------------- +---+-------------------+ |id |eventTime | +---+-------------------+ |1 |2023-06-10 10:24:40| +---+-------------------+
And that's by design! The watermark moved on, hence the state store manager cleaned all expired entries up. The micro-batch then got a record that was out of the deduplication window limited by the watermark duration.
The outcome for this last step would be the same for the previous dropDuplicates method. But the first micro-batches would generate a different outcome because of the watermark column present in the deduplication columns list (.dropDuplicates("eventTime", "id")):
------------------------------------------- Batch: 0 ------------------------------------------- +---+-------------------+ |id |eventTime | +---+-------------------+ |2 |2023-06-10 10:20:50| |3 |2023-06-10 10:20:45| |1 |2023-06-10 10:20:30| |1 |2023-06-10 10:20:40| +---+-------------------+ ------------------------------------------- Batch: 2 ------------------------------------------- +---+-------------------+ |id |eventTime | +---+-------------------+ |4 |2023-06-10 10:21:50| |5 |2023-06-10 10:21:45| |1 |2023-06-10 10:22:40| +---+-------------------+ ------------------------------------------- Batch: 4 ------------------------------------------- +---+-------------------+ |id |eventTime | +---+-------------------+ |1 |2023-06-10 10:24:40| +---+-------------------+
RocksDB state store
Besides these 2 user-facing features, there are some huge improvements for the RocksDB state store, such as:
- Changelog checkpointing. Chaoqin Li solved an issue detected in the previous release related to the checkpoint performance. Previously the RocksDB backend was pausing the instances to upload the snapshot to the checkpoint location. Now, the logic has changed and consists of synchronizing the changelog instead of snapshot. The snapshot is still uploaded, though, but only as a background task not impacting the micro-batch commit.
- Memory management enhancements. Anish Shrigondekar improved the memory management by using native RocksDB operations instead of the writeBatch and managing a global off-heap memory limit on the node to avoid OOM errors.
- Support for tracking pinned blocks memory usage. The memory enhancements were not the single contribution from Anish to the 3.5.0. He added a new metric (rocksdbPinnedBlocksMemoryUsage) to track the memory usage for pinned blocks, e.g. in cache.
- Support for max_write_buffer_number and write_buffer_size. Anish is also the author of these 2 new settings supported to fine-tune the memory usage of RocksDB.
- Orphan sst and log files cleanup in the checkpoint directory. Additionally, Chaoqin improved the cleaning for orphan SST and log files in the checkpoint directory. Whenever a RocksDB state store version gets uploaded to the checkpoint location and overrides the previous zip archive, the checkpoint process now removes the orphans automatically.
onQueryIdle
Finally, Jungtaek Lim extended the StreamingQueryListener with a new method. The onQueryIdle callback applies to an idle query that is waiting for the new data to process. The example below:
sparkSession.streams.addListener(new StreamingQueryListener() { override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {} override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {} override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {} override def onQueryIdle(event: QueryIdleEvent): Unit = { println("Query is idle:") println(event.json) } }) import sparkSession.implicits._ implicit val sparkContext: SQLContext = sparkSession.sqlContext val memoryStream1 = MemoryStream[Int] val query = memoryStream1.toDS val writeQuery = query.writeStream .trigger(Trigger.ProcessingTime("10 seconds")) .format("console") .option("truncate", false).start() memoryStream1.addData(Seq(1, 2, 3)) writeQuery.awaitTermination()
When you let this code running for a few seconds, you should see the prints showing up:
Query is idle: {"id":"a6d930ae-cefa-4d9b-b0f6-67359be6c912","runId":"da81f2e0-8fbc-4fa9-8e08-b8b1d3f0370a","timestamp":"2023-10-02T04:34:16.000Z"} Query is idle: {"id":"a6d930ae-cefa-4d9b-b0f6-67359be6c912","runId":"da81f2e0-8fbc-4fa9-8e08-b8b1d3f0370a","timestamp":"2023-10-02T04:34:26.001Z"} Query is idle: {"id":"a6d930ae-cefa-4d9b-b0f6-67359be6c912","runId":"da81f2e0-8fbc-4fa9-8e08-b8b1d3f0370a","timestamp":"2023-10-02T04:34:36.000Z"} Query is idle: {"id":"a6d930ae-cefa-4d9b-b0f6-67359be6c912","runId":"da81f2e0-8fbc-4fa9-8e08-b8b1d3f0370a","timestamp":"2023-10-02T04:34:46.001Z"}
In the blog post I omitted one feature on purpose, the watermark propagation which will be the topic of the next article!