What's new in Apache Spark 3.2.0 - RocksDB state store

Versions: Apache Spark 3.2.0

It's big news for Apache Spark Structured Streaming users. RocksDB is now available as a Vanilla Spark-backed state store backend!

New ebook 🔥

Learn 84 ways to solve common data engineering problems with cloud services.

👉 I want my Early Access edition

Initialization

To use the RocksDB state store, you have to explicitly define a provider class in the Apache Spark configuration: .config("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider"). The StateStore companion object's createAndInit method will later initialize the provider instance and call the RocksDBStateStoreProvider's init method, where the provider :

And that's all for the init step. But much more things happen later when Apache Spark uses the state store in a stateful operation.

CRUD operations

RocksDB state store shares the same read-write API as the HDFS-backed version. In the beginning, an appropriate store version gets loaded from the load(version: Long) method. The version corresponds to the micro-batch number, and the function starts by comparing the version to load with the current version of the application. If they're different, it means that the execution context probably concerns the reprocessing, and there is no local information. Hence, the state store will recover the database from the checkpoint location with the help of RocksDBFileManager. Because I'll cover this recovery aspect in the next section, let's move directly to the read/write actions.

Naturally, Apache Spark relies on RocksDB API to perform them. In the state store definition you will find the RocksDB classes like:

Compared to the default state store, the RocksDB implementation looks similar for the CRUD operations. Except for one difference. To recall, the remove operation of the default state store materializes that fact in the checkpointed delta file. It's not the case of RocksDB, which simply removes the state from the database. The difference is also related to the checkpointed elements presented just below.

Storage

In the following schema you can see the files managed by RocksDB state store:

There are 2 "spaces". The leftmost part stores all files used by the live instance of the RocksDB database. The working directory is where the data files of the instance, called SSTables and logs, live. The checkpoint space is the directory created by Apache Spark while committing the micro-batch version (see below). It stores all files that are going to be checkpointed. Finally, the file manager space is the directory reserved to the RocksDBFileManager activity. The manager uses it mostly in the clean-up stage to store the uncompressed content of the checkpointed files.

Regarding the checkpoint location, you'll find there the RocksDB files (logs, SSTs, archived logs directory), some metadata with the state store schema, and zip files storing other files necessary for the restore process, such as RocksDB options or the mapping between the checkpointed files and their local names.

Fault tolerance

The commit function of the state store API runs after processing all input rows for the shuffle partition to materialize the state store for the current micro-batch version. In RocksDB this operation involves the following steps:

When it comes to the state store recovery in the load method, RocksDB state store delegates this action to the RocksDBFileManager. During the restore, the manager downloads the zipped file and unzips it to the local RocksDB working directory. It later iterates the list of the sstFiles from the metadata and brings them back from the checkpoint location to the RocksDB directory. It's worth noticing that all the copied files are locally stored with the correct names, i.e. the UUID-like name is replaced with the localFileName attribute.

Once this copy operation completes, Apache Spark has all necessary files to restart the previous RocksDB instance. It does that by referencing the working directory in the initialization call: NativeRocksDB.open(dbOptions, workingDir.toString).

RocksDB is an interesting alternative to the default on-heap-based state store implementation, optimized for large state usage. However, understanding it was a bit more challenging since it requires discovering new technology. Thankfully, as an end-user, you may not need to know all these details, but if you do, I hope the blog post can help to take a few shortcuts!