Data+AI Summit follow-up post: Why RocksDB rocks?

One reason why you can think about using a custom state store is the performance issues, or rather unpredictable execution time due to the shared memory between the default state store implementation and Apache Spark task execution. To overcome that, you can try to switch the state store implementation to an off-heap-based one, like RocksDB.

RocksDB is not the single available embedded database used by modern data systems. Among its alternatives, you will find Badger (data store used for DGraph database, written in Go) or FASTER (persistent recoverable log and key-value store from Microsoft Research). Maybe sometime in the future, they will also be available for Apache Spark users. For now, RocksDB seems to be the most interesting alternative to the default state store thanks to its 2 Open Source implementation and native support on the Databricks platform. That's the reason why I will focus on it here.

RocksDB 101

According to Optimize performance of stateful streaming queries section from Databricks documentation, RocksDB reduces the JVM memory pressure significantly. According to this study, the default HashMap-based state store can keep up to 2 million keys per executor (per JVM), whereas the RocksDB-based store works efficiently even with 100 million states! Moreover, it doesn't impact in any way the GC process. But please don't get me wrong, it's not a silver bullet and has some drawbacks listed in the next section. However, before going there, let's focus on the RocksDB and understand how it works.

Below you can find a list of few key-terms characterizing RocksDB:

Pros and cons

As you could already learn, RocksDB helps reduce the memory pressure on the JVM used to execute the executor's tasks. Their execution shouldn't be disturbed by the GC collection applied on the task and state store objects. Thus the execution time should be more predictable. An alternative to the embedded databases could be remote fast-access key-value data stores. However, the cost of bringing data back to the worker nodes can be costly and not adapted to the low latency streaming scenarios.

This on-heap vs. off-heap isolation is then a good way to accelerate the processing. However, it doesn't come without cost. The off-heap memory won't deal with Java objects, so we will have to serialize/deserialize them to/from an array of bytes. And this operation can also be costly. In the "In support of workload-aware streaming state management" paper quoted below this article, you will find that data processing frameworks can spend up to 1/5 of their time on this serialization/deserialization task.

As you can see, RocksDB, and more generally an embedded off-heap-based database, can be a great way to reduce the memory pressure for stateful applications. And if you encounter the performance issues that are maybe related to the on-heap state store, you can try to overcome it with Databricks RocksDB or one of 2 Open Source implementations ([1], [2]) 💪