Collect action and determinism

Versions: Apache Spark 3.1.1

Even though nowadays RDD tends to be a low level abstraction and we should use SQL API, some of its methods are still used under-the-hood by the framework. During one of my explorations I wanted to analyze the task responsible for listing the files to process. Initially, my thought was "oh,it uses collect() and the results will be different every time". However, after looking at the code I saw how wrong I was!

Before all things, *do not* use collect() if you don't reduce the input dataset to a size supported by the driver. Otherwise, you will inevitably throw an OOM exception. Anyway, if you have to use it, below, you can find what happens.

The call to collect() invokes this function:

 def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }

As you can see, it submits a new "collect" job that will materialize all lines present in the partition (iter.toArray). But this runJob method is just a façade that will delegate the physical execution to more specialized runJob versions. One of these versions explains why the results will be always ordered by the input partition:

class SparkContext(config: SparkConf) extends Logging {
// ...
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int]): Array[U] = {
    val results = new Array[U](partitions.size)
    runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
    results
  }

  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = { 
// ...
}

As you can see here, the execution is delegated once again but this time, Apache Spark passes a function that will apply on the results generated by the func. And this function simply puts the materialized task results to the results array! So, even though the tasks of this collect job completed unordered, the driver at the end will always return them ordered by the task number. Below you will find a short demo showing that:

Of course, collect() can still generate non deterministic results if the mapping function is non deterministic itself (different input in the partitions, side effects). But if for whatever reason the tasks execute in different order and they always generate the same output, then the driver's will always return the same order results in the array.


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!