How to deduplicate entries in Spark SQL ?


Sometimes the entries in processed dataset can be duplicates. For instance, the IoT device can by mistake send the same metrics more than once or our ingestion step can badly format the message. Spark SQL natively provides a method to deal with these duplicated entries through dropDuplicates(colNames: Seq[String]) method.

The use of dropDuplicates is straightforward. We can define deduplication columns explicitly in the parameters or not define them at all. In the latter case Spark will consider all columns as duplication marker. We can see both uses in the following code snippet:

private val sparkSession: SparkSession = SparkSession.builder()
  .appName("Spark SQL dropDuplicates tip").master("local[*]").getOrCreate()

override def afterAll {

"duplicated primitive types" should "be removed" in {
  import sparkSession.implicits._
  val letters = Seq("A", "A", "B", "C", "D", "A", "D", "O", "J").toDF("letter")

  val distinctLetters = letters.dropDuplicates().map(row => row.getAs[String]("letter")).collectAsList()

  distinctLetters should contain allOf("A", "B", "C", "D", "J", "O")
  distinctLetters should have length 6

"duplicated composite types" should "be removed" in {
  import sparkSession.implicits._
  val letters = Seq(("A", 1, true), ("A", 2, false), ("B", 3, true), ("A", 4, true))
    .toDF("letter", "number", "was_read")

  val distinctLetters = letters.dropDuplicates("letter", "was_read")
    .map(row => (row.getAs[String]("letter"), row.getAs[Int]("number"), row.getAs[Boolean]("was_read")))

  distinctLetters should contain allOf(("A", 1, true), ("A", 2, false), ("B", 3, true))
  distinctLetters should have length 3

However the dropDuplicates operation produces shuffle. If we explain the execution plan, we'll receive something similar to:

== Parsed Logical Plan ==
Deduplicate [letter#3], false
+- Project [value#1 AS letter#3]
   +- LocalRelation [value#1]

== Analyzed Logical Plan ==
letter: string
Deduplicate [letter#3], false
+- Project [value#1 AS letter#3]
   +- LocalRelation [value#1]

== Optimized Logical Plan ==
Aggregate [letter#3], [letter#3]
+- LocalRelation [letter#3]

== Physical Plan ==
*HashAggregate(keys=[letter#3], functions=[], output=[letter#3])
+- Exchange hashpartitioning(letter#3, 200)
   +- *HashAggregate(keys=[letter#3], functions=[], output=[letter#3])
      +- LocalTableScan [letter#3]

The code above shows that the physical execution uses shuffle (Exchange's implementation with hash partitioning strategy based on letter column.