Why UnsafeRow.copy() for state persistence in the state store?

on waitingforcode.com

Why UnsafeRow.copy() for state persistence in the state store?

You want to become a data engineer and don't know where to start? I was like you 4 years ago when I started to learn the data. From that experience I prepared a 12-weeks course that will help you to become a data engineer. Join the class today! Join the class!
In my last Spark+AI Summit 2019 follow-up posts I'm implementing a custom state store. The extension is inspired by the default state store. At the moment of code analysis, one of the places that intrigued me was the put(key: UnsafeRow, value: UnsafeRow) method. Keep reading if you're curious why.

The put method is responsible for adding new value to the state store and its implementation is quite straightforward:

    override def put(key: UnsafeRow, value: UnsafeRow): Unit = {
      verify(state == UPDATING, "Cannot put after already committed or aborted")
      val keyCopy = key.copy()
      val valueCopy = value.copy()
      mapToUpdate.put(keyCopy, valueCopy)
      writeUpdateToDeltaFile(compressedStream, keyCopy, valueCopy)
    }

Do you see something intriguing? Yes, the key and value stored in the state store cache map are both the copies of the original data! Why so? The answer is given in the StateStore interface comment:

  /**
   * Put a new value for a non-null key. Implementations must be aware that the UnsafeRows in
   * the params can be reused, and must make copies of the data as needed for persistence.
   */
  def put(key: UnsafeRow, value: UnsafeRow): Unit

You said "reused"? Yes, if you look at the UnsafeRow implementations, you will see that it's never created per-row. Its constructor takes only 1 parameter with the number of fields in the schema. The value for each UnsafeRow instance is assigned later, when pointTo(Object baseObject, long baseOffset, int sizeInBytes) or pointTo(byte[] buf, int sizeInBytes) is called. And very often Spark creates a single UnsafeRow instance and uses it as a wrapper for the real data objects. You can see that use in:

  • HashedRelation - here a single UnsafeRow instance is created and shared between get() and getValue() methods:
      // re-used in get()/getValue()
      var resultRow = new UnsafeRow(numFields)
    
      override def get(key: InternalRow): Iterator[InternalRow] = {
        val unsafeKey = key.asInstanceOf[UnsafeRow]
        val map = binaryMap  // avoid the compiler error
        val loc = new map.Location  // this could be allocated in stack
        binaryMap.safeLookup(unsafeKey.getBaseObject, unsafeKey.getBaseOffset,
          unsafeKey.getSizeInBytes, loc, unsafeKey.hashCode())
        if (loc.isDefined) {
          new Iterator[UnsafeRow] {
            private var _hasNext = true
            override def hasNext: Boolean = _hasNext
            override def next(): UnsafeRow = {
              resultRow.pointTo(loc.getValueBase, loc.getValueOffset, loc.getValueLength)
              _hasNext = loc.nextValue()
              resultRow
            }
          }
        } else {
          null
        }
      }
    
      def getValue(key: InternalRow): InternalRow = {
        val unsafeKey = key.asInstanceOf[UnsafeRow]
        val map = binaryMap  // avoid the compiler error
        val loc = new map.Location  // this could be allocated in stack
        binaryMap.safeLookup(unsafeKey.getBaseObject, unsafeKey.getBaseOffset,
          unsafeKey.getSizeInBytes, loc, unsafeKey.hashCode())
        if (loc.isDefined) {
          resultRow.pointTo(loc.getValueBase, loc.getValueOffset, loc.getValueLength)
          resultRow
        } else {
          null
        }
      }
    
  • UnsafeExternalRowSorter - a single instances are used to compare rows:
        private final UnsafeRow row1;
        private final UnsafeRow row2;
    
        RowComparator(Ordering ordering, int numFields) {
          this.row1 = new UnsafeRow(numFields);
          this.row2 = new UnsafeRow(numFields);
          this.ordering = ordering;
        }
    
        @Override
        public int compare(
            Object baseObj1,
            long baseOff1,
            int baseLen1,
            Object baseObj2,
            long baseOff2,
            int baseLen2) {
          // Note that since ordering doesn't need the total length of the record, we just pass 0
          // into the row.
          row1.pointTo(baseObj1, baseOff1, 0);
          row2.pointTo(baseObj2, baseOff2, 0);
          return ordering.compare(row1, row2);
        }
    
  • UnsafeRowSerializer - here a single UnsafeRow is used when the serialized rows are deserialized:
      override def deserializeStream(in: InputStream): DeserializationStream = {
        new DeserializationStream {
          private[this] val dIn: DataInputStream = new DataInputStream(new BufferedInputStream(in))
          // 1024 is a default buffer size; this buffer will grow to accommodate larger rows
          private[this] var rowBuffer: Array[Byte] = new Array[Byte](1024)
          private[this] var row: UnsafeRow = new UnsafeRow(numFields)
    
              override def next(): (Int, UnsafeRow) = {
                if (rowBuffer.length < rowSize) {
                  rowBuffer = new Array[Byte](rowSize)
                }
                ByteStreams.readFully(dIn, rowBuffer, 0, rowSize)
                row.pointTo(rowBuffer, Platform.BYTE_ARRAY_OFFSET, rowSize)
    

Before terminating, let's check what happened if we kept a not copied instance of an UnsafeRow in a list:

    import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
    val stringEncoder = ExpressionEncoder[String]
    val rowA = stringEncoder.toRow("A").asInstanceOf[UnsafeRow]

    assert(rowA.getString(0) == "A")

    val rowB = stringEncoder.toRow("B").asInstanceOf[UnsafeRow]
    assert(rowB.getString(0) == "B")

    rowA.pointTo(rowB.getBytes(), rowB.getSizeInBytes)
    assert(rowA.getString(0) == "B")
    assert(rowB.getString(0) == "B")

As you can see, the UnsafeRow created at the beginning references the second row. In Apache Spark source code you will find a lot of places where the UnsafeRow is shared but also the places where a single method returns a new UnsafeRow instance. But despite the fact of being created locally, it doesn't mean that it won't be mutated elsewhere.

Read also about Why UnsafeRow.copy() for state persistence in the state store? here: UnsafeRow — Mutable Raw-Memory Unsafe Binary Row Format .

Share on:

Share, like or comment this post on Twitter:

Share, like or comment this post on Facebook: