Why UnsafeRow.copy() for state persistence in the state store?

In my last Spark+AI Summit 2019 follow-up posts I'm implementing a custom state store. The extension is inspired by the default state store. At the moment of code analysis, one of the places that intrigued me was the put(key: UnsafeRow, value: UnsafeRow) method. Keep reading if you're curious why.

4-day workshop · In-person or online

What would it take for you to trust your Databricks pipelines in production?

A 3-day bug hunt on a 3-person team costs up to €7,200 in lost engineering time. This workshop teaches you to prevent that — unit tests, data tests, and integration tests for PySpark and Databricks Lakeflow, including Spark Declarative Pipelines.

Unit, data & integration tests
Medallion architecture & Lakeflow SDP
Max 10 participants · production-ready templates
See the full curriculum → €7,000 flat fee · cohort of up to 10
Bartosz Konieczny
Bartosz
Konieczny

The put method is responsible for adding new value to the state store and its implementation is quite straightforward:

    override def put(key: UnsafeRow, value: UnsafeRow): Unit = {
      verify(state == UPDATING, "Cannot put after already committed or aborted")
      val keyCopy = key.copy()
      val valueCopy = value.copy()
      mapToUpdate.put(keyCopy, valueCopy)
      writeUpdateToDeltaFile(compressedStream, keyCopy, valueCopy)
    }

Do you see something intriguing? Yes, the key and value stored in the state store cache map are both the copies of the original data! Why so? The answer is given in the StateStore interface comment:

  /**
   * Put a new value for a non-null key. Implementations must be aware that the UnsafeRows in
   * the params can be reused, and must make copies of the data as needed for persistence.
   */
  def put(key: UnsafeRow, value: UnsafeRow): Unit

You said "reused"? Yes, if you look at the UnsafeRow implementations, you will see that it's never created per-row. Its constructor takes only 1 parameter with the number of fields in the schema. The value for each UnsafeRow instance is assigned later, when pointTo(Object baseObject, long baseOffset, int sizeInBytes) or pointTo(byte[] buf, int sizeInBytes) is called. And very often Spark creates a single UnsafeRow instance and uses it as a wrapper for the real data objects. You can see that use in:

Before terminating, let's check what happened if we kept a not copied instance of an UnsafeRow in a list:

    import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
    val stringEncoder = ExpressionEncoder[String]
    val rowA = stringEncoder.toRow("A").asInstanceOf[UnsafeRow]

    assert(rowA.getString(0) == "A")

    val rowB = stringEncoder.toRow("B").asInstanceOf[UnsafeRow]
    assert(rowB.getString(0) == "B")

    rowA.pointTo(rowB.getBytes(), rowB.getSizeInBytes)
    assert(rowA.getString(0) == "B")
    assert(rowB.getString(0) == "B")

As you can see, the UnsafeRow created at the beginning references the second row. In Apache Spark source code you will find a lot of places where the UnsafeRow is shared but also the places where a single method returns a new UnsafeRow instance. But despite the fact of being created locally, it doesn't mean that it won't be mutated elsewhere.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote one on that topic! You can read it online on the O'Reilly platform, or get a print copy on Amazon.

I also help solve your data engineering problems contact@waitingforcode.com đź“©