Data can have various quality issues, from missing to badly formatted values. However, there is another issue less people talk about, the erroneous filtering logic.
What would it take for you to trust your Databricks pipelines in production?
A 3-day bug hunt on a 3-person team costs up to €7,200 in lost engineering time. This workshop teaches you to prevent that — unit tests, data tests, and integration tests for PySpark and Databricks Lakeflow, including Spark Declarative Pipelines.
Konieczny
A bad filtering logic can lead to too much or not enough data, both being problematic. If you have too much data, data scientists can use irrelevant data to train their ML models. If you don't have enough data, the same models will give poor results and the BI dashboards will point out a business problem. So having this part right is crucial as well as having a correct data processing logic regarding the format and values.
Apache Spark doesn't come with a native way to monitor the filtering execution. True, it gives a number of output rows in the execution plan UI:
Fortunately, it's extensible enough to implement a filter monitor on our own with accumulators. How? Let's see.
Accumulators for filters
The code is very rudimentary. The job defines one accumulator per filter method and gets the number of hits in the end:
val idFilterAccumulator = sparkSession.sparkContext.longAccumulator("idFilterAccumulator")
val lowerUpperCaseFilterAccumulator = sparkSession.sparkContext.longAccumulator("lowerUpperCaseFilterAccumulator")
val filteredInput = input.filter(letter => {
val result = letter.id > 0
if (!result) idFilterAccumulator.add(1L)
result
}).filter(letter => {
val result = letter.lower_case != null && letter.upper_case != null
if (!result) lowerUpperCaseFilterAccumulator.add(1L)
result
})
filteredInput.collect()
println(s"idFilterAccumulator=${idFilterAccumulator.value}")
println(s"lowerUpperCaseFilterAccumulator=${lowerUpperCaseFilterAccumulator.value}")
It's easy but works for my simple example. It can become more problematic for a bigger number of filters, though. Fortunately, there are design patterns you can use to make all this code a bit less cumbersome. I'll use the decorator pattern that wraps code with a common behavior.
Decorator-based filters
The wrapped filter looks like that class:
class FilterWithAccumulatedResult(filterMethod: (LetterToTest) => Boolean, resultAccumulator: LongAccumulator) extends Serializable {
def filter(letter: LetterToTest): Boolean = {
val result = filterMethod(letter)if (!result) resultAccumulator.add(1L)
result
}
}
Now to call it:
val idFilterAccumulator = sparkSession.sparkContext.longAccumulator("idFilterAccumulator")
val lowerUpperCaseFilterAccumulator = sparkSession.sparkContext.longAccumulator("lowerUpperCaseFilterAccumulator")
val idFilter = new FilterWithAccumulatedResult(
(letter) => letter.id > 0, idFilterAccumulator
)
val lowerUpperCaseFilter = new FilterWithAccumulatedResult(
(letter) => letter.lower_case != null && letter.upper_case != null, lowerUpperCaseFilterAccumulator
)
val filteredInput = input.filter(idFilter.filter _).filter(lowerUpperCaseFilter.filter _)
filteredInput.collect()
println(s"idFilterAccumulator=${idFilterAccumulator.value}")
println(s"lowerUpperCaseFilterAccumulator=${lowerUpperCaseFilterAccumulator.value}")
The code looks better but it's not without drawbacks. If you compare it with the classical filters, it's still heavier. On the other hand, it does an extra operation, so this heaviness can be accepted. Something that may be harder to accept is the inability to use the SQL-based filter expressions. As you can notice in the decorator, the filter result must be evaluated in a programmatic function to be able to call the accumulator.
Despite these shortcomings, the solution can add an extra monitoring layer to get a better understanding of the input data and the filtering logic.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I wrote
one on that topic! You can read it online
on the O'Reilly platform,
or get a print copy on Amazon.
I also help solve your data engineering problems contact@waitingforcode.com đź“©
Related blog posts:
- Clean architecture for PySpark
- ASOF Join in Apache Spark SQL
- Outer operations in Apache Spark, or why I consider NULLs as NullPointerException
Filtering irrelevant records out is a common step in data pipelines. But how to know what is the filter rule filtering out the most of the input data ?One approach is to use accumulators in #ApacheSpark. An example in the new blog post ? https://t.co/ZEAPs0Hwrs pic.twitter.com/JXeCpTNmEw
— Bartosz Konieczny (@waitingforcode) March 2, 2023
