Data+AI Summit follow-up: drop duplicates and state management

Versions: Apache Spark 3.0.0

Another stateful operation requiring the state store is drop duplicates. You can use it to deduplicate your streaming data before pushing it to the sink.

In the first part of the blog post, you will see how Apache Spark transforms the logical plan involving streaming deduplication. Later, you will discover when and why it uses the state store in this operation.

Physical planning

Everything starts from StreamingDeduplicationStrategy physical rule that transforms the logical Deduplicate node into a StreamingDeduplicateExec.

Even though the physical planning step is rather straightforward, there are some important points to keep in mind regarding the physical execution node. The first is the child node's data distribution that will move all tuples sharing the same key to the same partition. And that's the first place where the state store appears because, as you already know, the state store is a partition-based component, and the operator uses the number of its partitions in this data distribution definition:

  override def requiredChildDistribution: Seq[Distribution] =
    ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil

dropDuplicates and state store

The data processing flow starts by building the deduplication key extractor. This function uses the columns you specified in the dropDuplicates method that together will uniquely identify every record. If you didn't define any column, Apache Spark will use all columns from the dataset to generate the id:

def dropDuplicates(): Dataset[T] = dropDuplicates(this.columns)
def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan { ...

What happens later? In the next step the operator creates the first iterator that depending on the presence of the watermark column, will or will not filter out the input records. If the filtering scenario happens, only the records not older than the watermark will be kept:

  lazy val watermarkExpression: Option[Expression] = {
    WatermarkSupport.watermarkExpression(
      child.output.find(_.metadata.contains(EventTimeWatermark.delayKey)),
      eventTimeWatermark)
  }

  lazy val watermarkPredicateForData: Option[BasePredicate] =
    watermarkExpression.map(Predicate.create(_, child.output))

val baseIterator = watermarkPredicateForData match {
        case Some(predicate) => iter.filter(row => !predicate.eval(row))
        case None => iter
      }

On top of this first iterator, Apache Spark will create a second one, this time to handle the deduplication problem. And that's here where the state store interaction will happen. The operator first builds the unique key for every not filtered row and uses it to query the state store. Regarding the outcome, it either ignores the input record (duplicate) or not. If the operator sees the row for the first time, it updates the state store with the unique key and an empty value for space efficiency. At the end, returns true to indicate that the record is not a duplicate and should be transferred to the next operator in the plan:

  private val EMPTY_ROW =
    UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))

      val result = baseIterator.filter { r =>
        val row = r.asInstanceOf[UnsafeRow]
        val key = getKey(row)
        val value = store.get(key)
        if (value == null) {
          store.put(key, StreamingDeduplicateExec.EMPTY_ROW)
          numUpdatedStateRows += 1
          numOutputRows += 1
          true
        } else {
          // Drop duplicated rows
          false
        }
      }

Once all records are classified as duplicates or not, the StreamingDeduplicateExec performs the cleaning action on the state store. It calls the removeKeysOlderThanWatermark(store: StateStore) and removes all keys behind the watermark. But, this cleaning action will be performed only if one of the columns you used in the unique key is the same as the watermark column! If it's the case, the cleaning action will remove all keys older than the watermark:

      CompletionIterator[InternalRow, Iterator[InternalRow]](result, {
        allUpdatesTimeMs += NANOSECONDS.toMillis(System.nanoTime - updatesStartTimeNs)
        allRemovalsTimeMs += timeTakenMs { removeKeysOlderThanWatermark(store) }
// ...

  lazy val watermarkExpression: Option[Expression] = {
    WatermarkSupport.watermarkExpression(
      child.output.find(_.metadata.contains(EventTimeWatermark.delayKey)),
      eventTimeWatermark)
  }
  lazy val watermarkPredicateForKeys: Option[BasePredicate] = watermarkExpression.flatMap { e =>
    if (keyExpressions.exists(_.metadata.contains(EventTimeWatermark.delayKey))) {
      Some(Predicate.create(e, keyExpressions))
    } else {
      None
    }
  }

  protected def removeKeysOlderThanWatermark(store: StateStore): Unit = {
    if (watermarkPredicateForKeys.nonEmpty) {
      store.getRange(None, None).foreach { rowPair =>
        if (watermarkPredicateForKeys.get.eval(rowPair.key)) {
          store.remove(rowPair.key)
        }
      }
    }
  }

At the end happens the same thing as for any other stateful operation. The deduplication operator updates state store metrics and also commits the current state store version.

The state store in dropDuplicates operator is then a way to persist the records of already seen keys. It brings fault tolerance since, by default, all changes are checkpointed. It also allows a fine-grained control on the keys with the help of watermark expression.

If you liked it, you should read:

The comments are moderated. I publish them when I answer, so don't worry if you don't see yours immediately :)

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!