Picture this. You get a list of values in a column and you need to combine each of them with another row. The simplest way for doing that is to use the explode operation and create a dedicated row for the concatenated values. Unlucky you, several rows in the input have nulls instead of the list.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I wrote
one on that topic! You can read it online
on the O'Reilly platform,
or get a print copy on Amazon.
I also help solve your data engineering problems 👉 contact@waitingforcode.com 📩
I'll start this blog post with a short digression. Back in time when I was working on backend engineering, I was always worried by a NullPointerException (NPE). It was so long ago that the single available optional type was Optional in Guava... I faced the NPE many times but I remember I was always angry when the NPE came out from the toString method (str() in Python). So I used to take care of that by adding null-checks, using optionals, or required annotations for the web services to avoid having to deal with nulls. When I transitioned to the data world, I realized I've started doing the same bulletproofing with null-safe data transformations and NULL values [after some hard lessons learned, though]. This blog post is just one of those bulletproofs you should always keep in mind while designing data transformation workflows.
Explode a row
A popular transformation to deal with arrays is explode. The function takes a column as a parameter and explodes it by putting each value on a new row. Below you can see a quick example:
letters = spark.createDataFrame([{'id': 1, 'letters': ['a','b','c']}], 'id INT, letters ARRAY<STRING>')
letters.withColumn('letter', F.explode('letters')).sort(F.asc('letter')).show(truncate=False)
"""
+---+---------+------+
|id |letters |letter|
+---+---------+------+
|1 |[a, b, c]|a |
|1 |[a, b, c]|b |
|1 |[a, b, c]|c |
+---+---------+------+
"""
Looks simple, doesn't it? It is simple but look what happens when the exploded column has NULLs:
letters_with_nulls = spark.createDataFrame([
{'id': 1, 'letters': ['a','b','c']}, {'id': 2, 'letters': None}, {'id': 3, 'letters': ['d',None]}, {'id': 4, 'letters': []}
], 'id INT, letters ARRAY<STRING>')
letters_with_nulls.withColumn('letter', F.explode('letters')).sort(F.asc('letter')).show(truncate=False)
Magically - technically is by design, but it might look like magic if you are not yet that sensible to undefined values - the rows number 2 and 4 disappeared from the show:
+---+---------+------+ |id |letters |letter| +---+---------+------+ |3 |[d, NULL]|NULL | |1 |[a, b, c]|a | |1 |[a, b, c]|b | |1 |[a, b, c]|c | |3 |[d, NULL]|d | +---+---------+------+
Why so? Because of a hidden logical rule called InferFiltersFromGenerate. The rule applies filters for all generators, so the functions that generate more data. It's an early optimization rule that removes all rows that might have been removed by other operations made later in the code, such as joins:
object InferFiltersFromGenerate extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning(
_.containsPattern(GENERATE)) {
case generate @ Generate(g, _, false, _, _, _) if canInferFilters(g) =>
// ...
val inferredFilters = ExpressionSet(
Seq(GreaterThan(Size(input), Literal(0)), IsNotNull(input))
) -- generate.child.constraints
if (inferredFilters.nonEmpty) {
generate.copy(child = Filter(inferredFilters.reduce(And), generate.child))
}
private def canInferFilters(g: Generator): Boolean = g match {
case _: ExplodeBase => true
case _: Inline => true
case _ => false
}
The thing is, this filter might not be visible in the execution plan but if you check the logs and look for the generated code, you should see the following entries:
DEBUG Generated predicate '((size(input[1, array<int>, true], false) > 0) AND isnotnull(input[1, array<int>, true]))':
/* 005 */ class SpecificPredicate extends org.apache.spark.sql.catalyst.expressions.BasePredicate {
// ...
/* 018 */ public boolean eval(InternalRow i) {
/// ...
/* 054 */ return !isNull_0 && value_0;
/* 055 */ }
Explode outer
Now one question remains, how to deal with these nulls and keep explode working even for undefined columns? The simple answer is the explode_outer that keeps nulls. Let's see how the output of our previous example adapts to this new function:
letters_with_nulls.withColumn('letter', F.explode_outer('letters')).sort(F.asc('letter')).show(truncate=False)
"""
+---+---------+------+
|id |letters |letter|
+---+---------+------+
|3 |[d, NULL]|NULL |
|4 |[] |NULL |
|2 |NULL |NULL |
|1 |[a, b, c]|a |
|1 |[a, b, c]|b |
|1 |[a, b, c]|c |
|3 |[d, NULL]|d |
+---+---------+------+
"""
But wait a minute, explode_outer is also a generator expression. Why, unlike the explode, it doesn't have the filter? The answer is hidden in the InferFiltersFromGenerate. If you take a look at the apply function, you'll notice it only applies the filter for the generate expressions that are not marked as outer. It is the false flag from the next snippet:
object InferFiltersFromGenerate extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning(
_.containsPattern(GENERATE)) {
case generate @ Generate(g, _, false, _, _, _) if canInferFilters(g) =>
Bonus: explode and maps
As a bonus and at the same time, the closing words, you should also know that the explode works for maps. And it shares exactly the same gotchas for nulls by the way. Below is a quick example:
letters = spark.createDataFrame([
{'id': 1, 'letters': {'a': 'A','b': 'B','c': 'C'}},
{'id': 2, 'letters': None},
{'id': 3, 'letters': {'d': None, None: 'E'}},
], 'id INT, letters MAP<STRING, STRING>')
(letters.select(
F.col('id'), F.col('letters'),
F.explode('letters').alias('lower', 'upper'))
.show(truncate=False))
"""
+---+------------------------+-----+-----+
|id |letters |lower|upper|
+---+------------------------+-----+-----+
|1 |{a -> A, b -> B, c -> C}|a |A |
|1 |{a -> A, b -> B, c -> C}|b |B |
|1 |{a -> A, b -> B, c -> C}|c |C |
|3 |{d -> NULL} |d |NULL |
+---+------------------------+-----+-----+
"""
As for arrays, the outer function returns an additional row for the null map:
(letters.select(
F.col('id'), F.col('letters'),
F.explode_outer('letters').alias('lower', 'upper'))
.show(truncate=False))
"""
+---+------------------------+-----+-----+
|id |letters |lower|upper|
+---+------------------------+-----+-----+
|1 |{a -> A, b -> B, c -> C}|a |A |
|1 |{a -> A, b -> B, c -> C}|b |B |
|1 |{a -> A, b -> B, c -> C}|c |C |
|2 |NULL |NULL |NULL |
|3 |{d -> NULL} |d |NULL |
+---+------------------------+-----+-----+
"""
Exploding an array is another example where nulls are problematic. For that reason it is important to consider all nullable columns as potential NullPointerExceptions that, for sure won't break the pipeline, but will generate an incorrect outcome.
Consulting
With nearly 16 years of experience, including 8 as data engineer, I offer expert consulting to design and optimize scalable data solutions.
As an O’Reilly author, Data+AI Summit speaker, and blogger, I bring cutting-edge insights to modernize infrastructure, build robust pipelines, and
drive data-driven decision-making. Let's transform your data challenges into opportunities—reach out to elevate your data engineering game today!
👉 contact@waitingforcode.com
đź”— past projects

