isEmpty() trap in Spark

Versions: Spark 2.1.0

In general Spark's actions reflects logic implemented in a lot of equivalent methods in programming languages. As an example we can consider isEmpty() that in Spark checks the existence of only 1 element and similarly in Java's List. But it can often lead to troubles, especially when more than 1 action is invoked.

This post focuses on Spark's isEmpty() action. The first part explains what happens if it's called. It's illustrated with adequate test case. The second part, still through a test case, shows what can go wrong if isEmpty() is called with other action. The last part gives a tip that could help to prevent problem occurred in the 2nd section of this post.

isEmpty() internals

In other words, when RDD's isEmpty() method is called, it checks if RDD has partitions and if there are no entries on them. It's visible in method's implementation that in Spark 2.1.0 looks like:

def isEmpty(): Boolean = withScope {
  partitions.length == 0 || take(1).length == 0
}

The left side of comparison is straightforward since it checks only a property of partitions array. The right part does more operations. It invokes take(Int) method. As the name indicates, it takes the first n elements encountered in all partitions. Below test case shows the way of working of isEmpty():

"isEmpty() method" should "invoke the transformation on 1 element" in {
  val numbersRdd = sparkContext.parallelize(Seq(1, 2, 3, 4, 5))
  val numbersAccumulator = sparkContext.collectionAccumulator[Int]("numbers accumulator")

  val numbersMultipliedBy2 = numbersRdd.map(number => {
    println(s"Mapping number ${number}")
    numbersAccumulator.add(number)
    number * 2
  })


  if (numbersMultipliedBy2.isEmpty()) {
    println("Numbers are empty")
  }

  numbersAccumulator.value.size shouldEqual(1)
  numbersAccumulator.value should contain only(1)
}

Problematic isEmpty()

The nature of isEmpty() should not introduce a lot of problems during processing. After all, it operates only on the 1st encountered element and making 2 actions, the 2nd conditioned by emptiness of RDD, could not be difficult. Below example shows the problematic case:

"isEmpty() method invoked with other action" should "make accumulated data false" in {
  val numbersRdd = sparkContext.parallelize(Seq(1, 2, 3, 4, 5))
  val numbersAccumulator = sparkContext.collectionAccumulator[Int]("numbers accumulator")

  val numbersMultipliedBy2 = numbersRdd.map(number => {
    println(s"Mapping number ${number}")
    numbersAccumulator.add(number)
    number * 2
  })

  //  It's an anti-pattern, written here
  // only to show possible errors of incorrect 
  // implementation
  if (!numbersMultipliedBy2.isEmpty()) {
    numbersMultipliedBy2.saveAsTextFile(outputDir)
  }

  Directory(outputDir).exists should be(true)
  numbersAccumulator.value.size shouldEqual(6)
  // the "1" is duplicated because if was read twice:
  // by isEmpty() and by saveAsTextFile() actions
  numbersAccumulator.value should contain theSameElementsAs(Seq(1, 1, 2, 3, 4, 5))
}

As you can see, the action of saving file is conditioned (for whatever reason) by the emptiness of mapped RDD. As a result we can see that some of RDD elements were computed twice ('1' is accumulated twice). The other problem with it is that obviously 2 jobs are created, both having all computations from scratch. If instead of parallelizing objects we would read a millions of lines from different files, the processing time would very probably increase a lot. But fortunately, there is a solution.

Problematic isEmpty() solution

One of solutions for this kind of operations where two or more actions apply on the same or almost the same data is the use of cache. The simplest way of using cache consists on simply calling cache() method that will store computed RDDs in executors memory. It's shown in below code:

"isEmpty() with other action" should "not execute accumulator twice" in {
  val numbersRdd = sparkContext.parallelize[Int](Seq(1, 2, 3, 4, 5))
  val numbersAccumulator = sparkContext.collectionAccumulator[Int]("numbers accumulator")

  val numbersMultipliedBy2 = numbersRdd.map(number => {
    println(s"Mapping number ${number}")
    numbersAccumulator.add(number)
    number * 2
  })

  numbersMultipliedBy2.cache()
  if (!numbersMultipliedBy2.isEmpty()) {
    numbersMultipliedBy2.saveAsTextFile(outputDir)
  }

  Directory(outputDir).exists should be(true)
  numbersAccumulator.value.size shouldEqual(5)
  numbersAccumulator.value should contain theSameElementsAs(Seq(1, 2, 3, 4, 5))
}

As proven in below test, cache() avoids transformations of the same RDD to be executed the same number of times than the number of declared actions. Caching activity can be detected through logs with following events:

DEBUG Level for block broadcast_0 is StorageLevel(disk, memory, deserialized, 1 replicas) (org.apache.s
park.storage.BlockManager:58)
DEBUG Getting local block rdd_1_0 (org.apache.spark.storage.BlockManager:58)
// ...
Block rdd_1_0 stored as values in memory (estimated size 40.0 B, free 337.5 MB) (org.apache.spark.storage.memory.MemoryStore:54)
Added rdd_1_0 in memory on 192.168.0.12:45228 (size: 40.0 B, free: 337.5 MB) (org.apache.spark.storage.BlockManagerInfo:54)

At first glance, isEmpty() action doesn't seem to be invasive. After all, it stops at the 1st found element. It can encourage to use it in with other actions to condition some behavior. The important thing to keep in mind is that isEmpty() triggers real computations. And if these computations are much complex than the ones used in above tests (e.g. by key operations) or if dataset is much bigger, it can slow down the processing. One solution to that could be the use of cache that avoids operations recomputation in the case of two or more actions involved on given RDD.