One reason why you can think about using a custom state store is the performance issues, or rather unpredictable execution time due to the shared memory between the default state store implementation and Apache Spark task execution. To overcome that, you can try to switch the state store implementation to an off-heap-based one, like RocksDB.
RocksDB is not the single available embedded database used by modern data systems. Among its alternatives, you will find Badger (data store used for DGraph database, written in Go) or FASTER (persistent recoverable log and key-value store from Microsoft Research). Maybe sometime in the future, they will also be available for Apache Spark users. For now, RocksDB seems to be the most interesting alternative to the default state store thanks to its 2 Open Source implementation and native support on the Databricks platform. That's the reason why I will focus on it here.
According to Optimize performance of stateful streaming queries section from Databricks documentation, RocksDB reduces the JVM memory pressure significantly. According to this study, the default HashMap-based state store can keep up to 2 million keys per executor (per JVM), whereas the RocksDB-based store works efficiently even with 100 million states! Moreover, it doesn't impact in any way the GC process. But please don't get me wrong, it's not a silver bullet and has some drawbacks listed in the next section. However, before going there, let's focus on the RocksDB and understand how it works.
Below you can find a list of few key-terms characterizing RocksDB:
- MemTable - every record written to or read from RocksDB passes first through this MemTable structure. It will be then the place in the memory storing all most recent writes that can significantly improve the reading of the most recent entries.
- SSTable - but the MemTable is not the final structure. After all, it's an in-memory structure and the data can be lost in case of failure. That's why, when there is too much data buffered, the pending records are flushed to a disk structure called SSTable. Data in SSTable is stored in key-value fashion. The keys are sorted so that records find the corresponding key-value pairs with efficient binary search algorithm.
- Write Ahead Log - in addition to MemTable, there is another place writing input records before they're synchronized to SSTable files. WAL is a file so it's the place that RocksDB will use in case of any failure to restore the pending records, lost from the MemTable structure. But the WAL won't store the records indefinitely. Once a max size of WAL file is reached, all records are moved to the SSTable file and a new WAL file is created to accept the new writes.
- LSM trees - a structure used in the compaction process.
- Bloom filters - to optimize the query time, RocksDB uses Bloom filter, so a probabilistic data structure that can say whether the searched key is present or not in a given SSTAble file.
- transaction support - Structured Streaming's state store is transactional, ie. it exposes a commit/abort method to confirm or cancel all changes performed in the given state store version. RocksDB is transactional too, including the commit and rollback support.
- checkpoints - RocksDB has also an operational feature to perform the checkpoints, so to take the whole database at time t and write it into a separate directory. It's then the fault-tolerance mechanism that you can find also in Apache Spark and other data systems.
- SSD-optimized - RocksDB was also optimized for SSD-based workloads. Compared to LevelDB and according to the first benchmarks made by Facebook, RocksDB performed 10 times better the random write workload.
Pros and cons
As you could already learn, RocksDB helps reduce the memory pressure on the JVM used to execute the executor's tasks. Their execution shouldn't be disturbed by the GC collection applied on the task and state store objects. Thus the execution time should be more predictable. An alternative to the embedded databases could be remote fast-access key-value data stores. However, the cost of bringing data back to the worker nodes can be costly and not adapted to the low latency streaming scenarios.
This on-heap vs. off-heap isolation is then a good way to accelerate the processing. However, it doesn't come without cost. The off-heap memory won't deal with Java objects, so we will have to serialize/deserialize them to/from an array of bytes. And this operation can also be costly. In the "In support of workload-aware streaming state management" paper quoted below this article, you will find that data processing frameworks can spend up to 1/5 of their time on this serialization/deserialization task.
As you can see, RocksDB, and more generally an embedded off-heap-based database, can be a great way to reduce the memory pressure for stateful applications. And if you encounter the performance issues that are maybe related to the on-heap state store, you can try to overcome it with Databricks RocksDB or one of 2 Open Source implementations (, ) 💪