Combining DataFrames - beyond UNIONs

A seasoned data engineer you are, you certainly know UNIONs are a great way to combine many DataFrames into one data processing abstraction. But do you know Apache Spark has other methods that operate on multiple DataFrames and go far beyond simply concatenating two datasets?

4-day workshop · In-person or online

What would it take for you to trust your Databricks pipelines in production?

A 3-day bug hunt on a 3-person team costs up to €7,200 in lost engineering time. This workshop teaches you to prevent that — unit tests, data tests, and integration tests for PySpark and Databricks Lakeflow, including Spark Declarative Pipelines.

Unit, data & integration tests
Medallion architecture & Lakeflow SDP
Max 10 participants · production-ready templates
See the full curriculum → €7,000 flat fee · cohort of up to 10
Bartosz Konieczny
Bartosz
Konieczny

Except

The first operation that you may or may not have heard about when it comes to combining different DataFrames is the except, or the exceptAll if you want to keep the duplicates. The purpose is to extract the values from a DataFrame that are missing in another DataFrame. Here is a quick example:

val df1 = ((1 to 10).map(nr => (nr, s"Number ${nr}")) ++ (1 to 10).map(nr => (nr, s"Number ${nr}"))).toDF("nr", "label")
val df2 = (3 to 13).map(nr => (nr, s"Number ${nr}")).toDF("nr", "label")
val missingRowsInDf1 = df1.except(df2)
missingRowsInDf1.show()
/*
+---+--------+
|nr |label   |
+---+--------+
|1  |Number 1|
|2  |Number 2|
+---+--------+
*/

The df1 contains two rows that are missing in the df2 and both are returned as a result of the except. When it comes to the exceptAll, the result preserves the duplicates. Consequently, you will see the first two rows twice but also see duplicated rows beyond the missing ones:

+---+---------+
| nr|    label|
+---+---------+
|  1| Number 1| # This is missing in df2
|  1| Number 1| # This is missing in df2 and comes from duplicates in df1
|  2| Number 2| # This is missing in df2
|  2| Number 2| # This is missing in df2 and comes from duplicates in df1
|  3| Number 3| # This is present only once in df2 but df1 duplicates it
|  4| Number 4| # Same as above for this and all subsequent rows
|  5| Number 5|
|  6| Number 6|
|  7| Number 7|
|  8| Number 8|
|  9| Number 9|
| 10|Number 10|
+---+---------+

Fine, you know the "what" but still don't know the "how". When it comes to the except - the one removing duplicates - the execution plan is a simple shuffle between both DataFrames based on the columns:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[nr#9, label#10], functions=[], output=[nr#9, label#10])
   +- Exchange hashpartitioning(nr#9, label#10, 1), ENSURE_REQUIREMENTS, [plan_id=23]
      +- HashAggregate(keys=[nr#9, label#10], functions=[], output=[nr#9, label#10])
         +- BroadcastHashJoin [coalesce(nr#9, 0), isnull(nr#9), coalesce(label#10, ), isnull(label#10)], [coalesce(nr#20, 0), isnull(nr#20), coalesce(label#21, ), isnull(label#21)], LeftAnti, BuildRight, false
            :- LocalTableScan [nr#9, label#10]
            +- BroadcastExchange HashedRelationBroadcastMode(List(coalesce(input[0, int, false], 0), isnull(input[0, int, false]), coalesce(input[1, string, true], ), isnull(input[1, string, true])),false), [plan_id=19]
               +- LocalTableScan [nr#20, label#21]

But it's not that simple for the exceptAll because the query engine first applies an union on both DataFrame, and later computes how many times missing rows should be replicated with the replicaterows generator:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [nr#9, label#10]
   +- Generate replicaterows(sum#32L, nr#9, label#10), [nr#9, label#10], false, [nr#9, label#10]
      +- Filter (isnotnull(sum#32L) AND (sum#32L > 0))
         +- HashAggregate(keys=[nr#9, label#10], functions=[sum(vcol#29L)], output=[nr#9, label#10, sum#32L])
            +- Exchange hashpartitioning(nr#9, label#10, 1), ENSURE_REQUIREMENTS, [plan_id=137]
               +- HashAggregate(keys=[nr#9, label#10], functions=[partial_sum(vcol#29L)], output=[nr#9, label#10, sum#34L])
                  +- Union
                     :- LocalTableScan [vcol#29L, nr#9, label#10]
                     +- LocalTableScan [vcol#30L, nr#20, label#21]

Subtract function

If you are a PySpark user, you also have access to the subtract function that turns out to be an alias for the except function:

def subtract(self, other: "DataFrame") -> "DataFrame":
  return DataFrame(getattr(self._jdf, "except")(other._jdf), self.sparkSession)

Intersect

The opposite operation for the except is intersect or intersectAll. The resulting DataFrame contains the rows that are present in both DataFrames without or with duplicates. Let's take a quick look:

val df1 = ((1 to 10).map(nr => (nr, s"Number ${nr}")) ++ (1 to 10).map(nr => (nr, s"Number ${nr}"))).toDF("nr", "label")
// Here the duplicates also apply to the df2!!!!
val df2 = ((3 to 13).map(nr => (nr, s"Number ${nr}")) ++ (3 to 5).map(nr => (nr, s"Number ${nr}"))).toDF("nr", "label")

val commonRowsNoDuplicates = df1.intersect(df2)
commonRowsNoDuplicates.show(truncate = false)
/*
+---+---------+
|nr |label    |
+---+---------+
|3  |Number 3 |
|4  |Number 4 |
|5  |Number 5 |
|6  |Number 6 |
|7  |Number 7 |
|8  |Number 8 |
|9  |Number 9 |
|10 |Number 10|
+---+---------+
*/

As you can see, common rows are returned only once. It doesn't happen for the intersectAll that preserves duplicates:

+---+---------+
| nr|    label|
+---+---------+
|  3| Number 3| # common in both, the first occurrence
|  3| Number 3| # common in both, the second occurrence
|  4| Number 4| # same as 3
|  4| Number 4| # same as 3
|  5| Number 5| # same as 3
|  5| Number 5| # same as 3
|  6| Number 6| # despite being duplicated in df1, there is only one row in common (missing second occurence in df2)
|  7| Number 7| # same as 6
|  8| Number 8| # same as 6
|  9| Number 9| # same as 6
| 10|Number 10| # same as 6
+---+---------+

What about the execution plans then? The intersect runs a left-semi join to keep rows present on both sides:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[nr#9, label#10], functions=[], output=[nr#9, label#10])
   +- Exchange hashpartitioning(nr#9, label#10, 1), ENSURE_REQUIREMENTS, [plan_id=23]
      +- HashAggregate(keys=[nr#9, label#10], functions=[], output=[nr#9, label#10])
         +- BroadcastHashJoin [coalesce(nr#9, 0), isnull(nr#9), coalesce(label#10, ), isnull(label#10)], [coalesce(nr#20, 0), isnull(nr#20), coalesce(label#21, ), isnull(label#21)], LeftSemi, BuildRight, false
            :- LocalTableScan [nr#9, label#10]
            +- BroadcastExchange HashedRelationBroadcastMode(List(coalesce(input[0, int, false], 0), isnull(input[0, int, false]), coalesce(input[1, string, true], ), isnull(input[1, string, true])),false), [plan_id=19]
               +- LocalTableScan [nr#20, label#21

The intersectAll on another side doesn't use a join but a union with the replicaterows generator, like the exceptAll:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [nr#9, label#10]
   +- Generate replicaterows(min_count#37L, nr#9, label#10), [nr#9, label#10], false, [nr#9, label#10]
      +- Project [nr#9, label#10, if ((vcol1_count#34L > vcol2_count#36L)) vcol2_count#36L else vcol1_count#34L AS min_count#37L]
         +- Filter ((vcol1_count#34L >= 1) AND (vcol2_count#36L >= 1))
            +- HashAggregate(keys=[nr#9, label#10], functions=[count(vcol1#29), count(vcol2#32)], output=[vcol1_count#34L, vcol2_count#36L, nr#9, label#10])
               +- Exchange hashpartitioning(nr#9, label#10, 1), ENSURE_REQUIREMENTS, [plan_id=141]
                  +- HashAggregate(keys=[nr#9, label#10], functions=[partial_count(vcol1#29), partial_count(vcol2#32)], output=[nr#9, label#10, count#40L, count#41L])
                     +- Union
                        :- LocalTableScan [vcol1#29, vcol2#32, nr#9, label#10]
                        +- LocalTableScan [vcol1#30, vcol2#31, nr#20, label#21]

Consequently, all rows having at least one match are returned and generated min_count times.

Positions danger

Both operations are position-based. If you schemas are not aligned, in best case scenario the query will fail because of the encoding issues. In the worst case scenario, the operation will succeed but return invalid results. Below is the latter scenario:

val df1 = ((1 to 10).map(nr => (s"$nr", s"Number ${nr}")) ++ (1 to 10).map(nr => (s"$nr", s"Number ${nr}"))).toDF("nr", "label")
val df2 = (3 to 13).map(nr => (s"Number ${nr}", s"$nr")).toDF("label", "nr")

val missingRowsInDf1 = df1.exceptAll(df2)

As you see, both DataFrames have the same columns but are declared in a different order. Consequently, running the exceptAll returns all rows from the first DataFrame:

+---+---------+
|nr |label    |
+---+---------+
|1  |Number 1 |
|1  |Number 1 |
|2  |Number 2 |
|2  |Number 2 |
|3  |Number 3 |
|3  |Number 3 |
|4  |Number 4 |
|4  |Number 4 |
|5  |Number 5 |
|5  |Number 5 |
|6  |Number 6 |
|6  |Number 6 |
|7  |Number 7 |
|7  |Number 7 |
|8  |Number 8 |
|8  |Number 8 |
|9  |Number 9 |
|9  |Number 9 |
|10 |Number 10|
|10 |Number 10|
+---+---------+

Similar to the exceptAll, the intersectAll will return unexpected results with an empty DataFrame. But despite this warning, both operations are pretty handy shortcuts to use when you want to either discover the different or common rows of two DataFrames.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote one on that topic! You can read it online on the O'Reilly platform, or get a print copy on Amazon.

I also help solve your data engineering problems contact@waitingforcode.com đź“©