Data+AI follow-up: Introduction to MapDB

Versions: MapDB 3.0.8

Since there are already 2 Open Source implementations for RocksDB state store, I decided to use another backend to illustrate how to customize the state store in Structured Streaming. Initially, I wanted to try with Badger which is the store behind DGraph database but didn't find any Java-facing interface and dealing with the Java Native Interface or any other wrapper, was not an option. Fortunately, I ended up by finding MapDB, a Kotlin-based - hence a Java-facing interface - embedded database.

In this blog post you will learn the basics of MapDB I used in my demo implementation of a custom state store. I won't repeat the documentation but only focus on the elements you can find in my custom MapDB state store.

DBMaker

The entrypoint for the MapDB is a builder class called DBMaker. It's mainly composed of the static factory methods. Each of them is responsible for one configuration part of the underlying database. For example, the following snippet creates a hash-map backed by a temporary file:

  it should "create a database backed by a temporary file" in {
    val database = DBMaker.tempFileDB().make()
    val lettersSet = database.hashSet("letters", Serializer.STRING).create()

    lettersSet.add("a")
    lettersSet.add("b")
    lettersSet.add("c")
    lettersSet.add("a")

    lettersSet.size() shouldEqual 3
    lettersSet.iterator().asScala.toSeq should contain allOf("a", "b", "c")
  }

In the state store implementation you can find 2 factory methods, fileMmapEnableIfSupported() and fileDB(). The former method enables memory-mapped files if the JVM is 64bits. What does it mean, the memory-mapped files? It means that the memory from user space is mapped to the filesystem pages, ie. that all operations performed on the memory are reflected in the files. Usually, this approach is much faster than accessing the file from the file system's API.

File-backed database (fileDB())

The second factory method, the fileDB(), creates the embedded database with the local file checkpoint. It's a very convenient way to materialize the changes made on the database's collections and have the possibility to restore them later:

  it should "create file-backed map and restore it after closing" in {
    val testDir = "/tmp/mapdb-test"
    FileUtils.deleteDirectory(new File(testDir))
    new File(testDir).mkdirs()
    val databaseFile = s"${testDir}/file1.db"
    val fileBasedDatabase = DBMaker.fileDB(databaseFile).make()
    val lettersSet = fileBasedDatabase.hashSet("letters", Serializer.STRING).create()

    Seq("a", "b", "c", "a").foreach(letter => lettersSet.add(letter))
    fileBasedDatabase.close()
    val restoredDatabase = DBMaker.fileDB(databaseFile).make()
    val restoredLettersSet = restoredDatabase.hashSet("letters", Serializer.STRING).open()
    Seq("d", "e", "f", "e").foreach(letter => restoredLettersSet.add(letter))

    restoredLettersSet.size() shouldEqual 6
    restoredLettersSet.iterator().asScala.toSeq should contain allOf("a", "b", "c", "d", "e", "f")
    val fileNames = FileUtils.listFiles(new File(testDir), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE).asScala.map(file => {
      file.getName
    }).toSeq
    fileNames should have size 1
    fileNames(0) shouldEqual "file1.db"
  }

You can certainly deduce already why I'm mentioning it. Yes, it'll be the strategy used to perform delta and snapshots and send them to the checkpoint location. But it will be the topic of one of the next blog posts. Let's stay here for a while. As you saw, the DBMaker only accepts the name of the file. There is no parameter defining the maximal size of this file or splitting strategy. What will happen if your application puts dozens of millions of entries to the file? Every file is created with an initial size that is later increased if needed:

    public RandomAccessFileVol(File file, boolean readOnly, long fileLockWait, long initSize) {
        this.file = file;

// ...
}
    @Override
    public synchronized void ensureAvailable(long offset) {
        try {
            if (raf.length() < offset)
                raf.setLength(offset);
        } catch (IOException e) {
            throw new DBException.VolumeIOError(e);
        }
    }

HashMap - createOrOpen

MapDB is an embedded database, so like any other database, it's a container of "collections". In the RDBMS world these "collections" will be schemas and tables, here it will be Java collections like maps and sets. You certainly noticed 2 properties of these Java collections from the previous snippets. Yes, the first of them is the name which can be considered as an RDBMS' table name. The second one is the serializer that will be used to transform the JVM object into the format expected by MapDB. Below you can find a simple demo of 2 different maps created and restored from the file-backed database:

  it should "create and restore 2 HashMaps" in {
    val testDir = "/tmp/mapdb-test-2"
    FileUtils.deleteDirectory(new File(testDir))
    new File(testDir).mkdirs()
    (0 to 2).foreach(runNumber => {
      println(s"Executing ${runNumber}")
      val databaseFile = s"${testDir}/file2.db"
      val fileBasedDatabase = DBMaker.fileDB(databaseFile).make()
      val stringsMap = fileBasedDatabase.hashMap("strings", Serializer.STRING, Serializer.STRING).createOrOpen()
      val intsMap = fileBasedDatabase.hashMap("ints", Serializer.INTEGER, Serializer.INTEGER).createOrOpen()
      if (runNumber == 0) {
        println(" Adding new values")
        stringsMap.put("a", "A")
        intsMap.put(1, 10)
        intsMap.put(2, 20)
      }

      stringsMap.size() shouldEqual 1
      stringsMap.get("a") shouldEqual "A"
      intsMap.size() shouldEqual 2
      intsMap.get(1) shouldEqual 10
      intsMap.get(2) shouldEqual 20

      stringsMap.close()
      intsMap.close()
      fileBasedDatabase.close()
    })
  }

If you have a more complex object to serialize, you can implement your own serializers and deserializers. However, the native API already provides some basic ones that you can use. In the state store implementation, instead of implementing a serialization logic for the UnsafeRow instance, I used the BYTE_ARRAY serializer. Even though I had to restore the UnsafeRow directly from this array of bytes, it was still a simpler approach than implementing the custom serializer (but maybe it's not the most efficient one, though?).

Transactions

And finally, the icing on the cake - the transactions! If you followed by Data+AI Summit talk, you noticed that the state store was transactional, ie. you can abort or commit all changes made in every version. To deal with that, nothing better than a backend store that is natively transactional. And MapDB is one of them. To enable the transactions, you have to call the transactionEnable() factory method and later deal with commit() or rollback() functions:

  it should "create a transactional database" in {
    val testDir = "/tmp/mapdb-test-3"
    FileUtils.deleteDirectory(new File(testDir))
    new File(testDir).mkdirs()
    val transactionalDatabase = DBMaker.fileDB(s"${testDir}/transactional-database-1.db").transactionEnable().make()
    val lettersSet = transactionalDatabase.hashSet("letters", Serializer.STRING).create()

    // Let's add first some letters and commit the transaction
    lettersSet.add("a")
    lettersSet.add("b")
    transactionalDatabase.commit()

    lettersSet.iterator().asScala.toSeq should contain allOf("a", "b")

    // And now, add new letters but rollback the transaction
    lettersSet.add("c")
    // No commit, so no visibility for the pending changes
    lettersSet.iterator().asScala.toSeq should contain allOf("a", "b")
    // Rollback now
    transactionalDatabase.rollback()
    lettersSet.iterator().asScala.toSeq should contain allOf("a", "b")
  }

As you can notice, the transactional aspect is simplified. There is no need to begin the transaction explicitly because it starts a new one implicitly after every commit/rollback. In the file system, this transactional aspect is translated by the existence of a WAL (Write Ahead Log) file:

bartosz@bartosz:/tmp/mapdb-test-3$ ll
-rw-rw-r--  1 bartosz bartosz 2097152 Oct 18 06:28 transactional-database-1.db
-rw-rw-r--  1 bartosz bartosz      45 Oct 18 06:28 transactional-database-1.db.wal.0

Under-the-hood, the class responsible for transactions management is org.mapdb.StoreWAL and if you analyze the implementation of the commit() method, you will notice that the transactional character doesn't come without extra performance cost:

            //move modified records from indexPages
            for (records in cacheRecords) {
                records.forEachKeyValue { offset, walId ->
                    val bytes = wal.walGetRecord(walId, 0)
                    realVolume.putData(offset, bytes, 0, bytes.size)
                }
                records.clear()
                records.compact()
            }

    override fun  put(record: R?, serializer: Serializer): Long {
// ...

        val segment = recidToSegment(recid)
        Utils.lockWrite(locks[segment]) {
// ...
                    val walId = wal.walPutRecord(recid, di.buf, 0, di.pos)
                    cacheRecords[segment].put(volOffset, walId)
// ...

In case of transactions, the changes are first written to the WAL and later synchronized to the data file. You will then encounter some extra cost but in exchange, get the anti-corruption protection. It means that if your application crashes in the middle, your database file won't be corrupted and you will be able to restore it. Check the following test case for comparison:

  it should "not restore not transactional store and restore the transactional one" in {
    val testDir = "/tmp/mapdb-test-4"
    FileUtils.deleteDirectory(new File(testDir))
    new File(testDir).mkdirs()

    val transactionalDatabase = DBMaker.fileDB(s"${testDir}/transactional-database-1.db").transactionEnable().make()
    val lettersSet = transactionalDatabase.hashSet("letters", Serializer.STRING).create()
    lettersSet.add("z")
    transactionalDatabase.commit()
    Try {
      Seq("a", "b", "c").foreach(letter => {
        lettersSet.add(letter)
        if (letter == "b") {
          throw new RuntimeException("An unexpected error")
        }
      })
    }

    val nonTransactionalDatabase = DBMaker.fileDB(s"${testDir}/nontransactional-database-1.db").make()
    val nonTransactionalLettersSet = nonTransactionalDatabase.hashSet("letters", Serializer.STRING).create()
    nonTransactionalLettersSet.add("z")
    Try {
      Seq("a", "b", "c").foreach(letter => {
        nonTransactionalLettersSet.add(letter)
        if (letter == "b") {
          throw new RuntimeException("An unexpected error")
        }
      })
    }

    // Do not call .close() on these DBs! If you app crashes, you won't have a chance to do so
    // Call `RestoreTransactionalAndNonTransactionalDatabases` later to simulate the application
    // restart using the same files
  }

object RestoreTransactionalAndNonTransactionalDatabases extends App {
  // Run me after 'it should "not restore not transactional store and restore the transactional one"'
  val testDir = "/tmp/mapdb-test-4"
  // Restore the transactional db
  val restoredTransactionalDatabase = DBMaker.fileDB(s"${testDir}/transactional-database-1.db").transactionEnable().make()
  val restoredLettersSet = restoredTransactionalDatabase.hashSet("letters", Serializer.STRING).open()
  val restoredLettersTransactional = restoredLettersSet.asScala.toSeq

  assert(restoredLettersTransactional.length == 1 && restoredLettersTransactional(0) == "z",
    "Unexpected items in the restored set")

  // Restore the non transactional, it will fail
  val failedRestore = Try {
    val nonTransactionalDatabaseRestored = DBMaker.fileDB(s"${testDir}/nontransactional-database-1.db").make()
  }.failed
  assert(failedRestore.get.getClass.toString == "class org.mapdb.DBException$DataCorruption",
    "DataCorruption was expected")
}

That's all you need to know if you check my custom state store implementation on top of MapDB. It exposes some interesting features like memory-mapped files (they exist in Java too, but I never met them before!) and transactions (transactions for a HashMap? Interesting!). But believe me, they are not the single features of MapDB, and to discover them better, I recommend you to follow the official documentation 📖