Filtering rules accumulator

Versions: Apache Spark 3.3.0

Data can have various quality issues, from missing to badly formatted values. However, there is another issue less people talk about, the erroneous filtering logic.

A bad filtering logic can lead to too much or not enough data, both being problematic. If you have too much data, data scientists can use irrelevant data to train their ML models. If you don't have enough data, the same models will give poor results and the BI dashboards will point out a business problem. So having this part right is crucial as well as having a correct data processing logic regarding the format and values.

Apache Spark doesn't come with a native way to monitor the filtering execution. True, it gives a number of output rows in the execution plan UI:

Fortunately, it's extensible enough to implement a filter monitor on our own with accumulators. How? Let's see.

Accumulators for filters

The code is very rudimentary. The job defines one accumulator per filter method and gets the number of hits in the end:

val idFilterAccumulator = sparkSession.sparkContext.longAccumulator("idFilterAccumulator")
val lowerUpperCaseFilterAccumulator = sparkSession.sparkContext.longAccumulator("lowerUpperCaseFilterAccumulator")
val filteredInput = input.filter(letter => {
  val result = letter.id > 0
  if (!result) idFilterAccumulator.add(1L)
  result
}).filter(letter => {
  val result = letter.lower_case != null && letter.upper_case != null
  if (!result) lowerUpperCaseFilterAccumulator.add(1L)
  result
})

filteredInput.collect()

println(s"idFilterAccumulator=${idFilterAccumulator.value}")
println(s"lowerUpperCaseFilterAccumulator=${lowerUpperCaseFilterAccumulator.value}")

It's easy but works for my simple example. It can become more problematic for a bigger number of filters, though. Fortunately, there are design patterns you can use to make all this code a bit less cumbersome. I'll use the decorator pattern that wraps code with a common behavior.

Decorator-based filters

The wrapped filter looks like that class:

class FilterWithAccumulatedResult(filterMethod: (LetterToTest) => Boolean, resultAccumulator: LongAccumulator) extends Serializable {

  def filter(letter: LetterToTest): Boolean = {
    val result = filterMethod(letter)if (!result) resultAccumulator.add(1L)
	result
  }

}

Now to call it:

val idFilterAccumulator = sparkSession.sparkContext.longAccumulator("idFilterAccumulator")
val lowerUpperCaseFilterAccumulator = sparkSession.sparkContext.longAccumulator("lowerUpperCaseFilterAccumulator")
val idFilter = new FilterWithAccumulatedResult(
  (letter) => letter.id > 0, idFilterAccumulator
)
val lowerUpperCaseFilter = new FilterWithAccumulatedResult(
  (letter) => letter.lower_case != null && letter.upper_case != null, lowerUpperCaseFilterAccumulator
)
val filteredInput = input.filter(idFilter.filter _).filter(lowerUpperCaseFilter.filter _)
filteredInput.collect()

println(s"idFilterAccumulator=${idFilterAccumulator.value}")
println(s"lowerUpperCaseFilterAccumulator=${lowerUpperCaseFilterAccumulator.value}")

The code looks better but it's not without drawbacks. If you compare it with the classical filters, it's still heavier. On the other hand, it does an extra operation, so this heaviness can be accepted. Something that may be harder to accept is the inability to use the SQL-based filter expressions. As you can notice in the decorator, the filter result must be evaluated in a programmatic function to be able to call the accumulator.

Despite these shortcomings, the solution can add an extra monitoring layer to get a better understanding of the input data and the filtering logic.


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!