Collect action and determinism

Versions: Apache Spark 3.1.1

Even though nowadays RDD tends to be a low level abstraction and we should use SQL API, some of its methods are still used under-the-hood by the framework. During one of my explorations I wanted to analyze the task responsible for listing the files to process. Initially, my thought was "oh,it uses collect() and the results will be different every time". However, after looking at the code I saw how wrong I was!

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote one on that topic! You can read it online on the O'Reilly platform, or get a print copy on Amazon.

I also help solve your data engineering problems 👉 contact@waitingforcode.com 📩

Before all things, *do not* use collect() if you don't reduce the input dataset to a size supported by the driver. Otherwise, you will inevitably throw an OOM exception. Anyway, if you have to use it, below, you can find what happens.

The call to collect() invokes this function:

 def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }

As you can see, it submits a new "collect" job that will materialize all lines present in the partition (iter.toArray). But this runJob method is just a façade that will delegate the physical execution to more specialized runJob versions. One of these versions explains why the results will be always ordered by the input partition:

class SparkContext(config: SparkConf) extends Logging {
// ...
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int]): Array[U] = {
    val results = new Array[U](partitions.size)
    runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
    results
  }

  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = { 
// ...
}

As you can see here, the execution is delegated once again but this time, Apache Spark passes a function that will apply on the results generated by the func. And this function simply puts the materialized task results to the results array! So, even though the tasks of this collect job completed unordered, the driver at the end will always return them ordered by the task number. Below you will find a short demo showing that:

Of course, collect() can still generate non deterministic results if the mapping function is non deterministic itself (different input in the partitions, side effects). But if for whatever reason the tasks execute in different order and they always generate the same output, then the driver's will always return the same order results in the array.

Consulting

With nearly 16 years of experience, including 8 as data engineer, I offer expert consulting to design and optimize scalable data solutions. As an O’Reilly author, Data+AI Summit speaker, and blogger, I bring cutting-edge insights to modernize infrastructure, build robust pipelines, and drive data-driven decision-making. Let's transform your data challenges into opportunities—reach out to elevate your data engineering game today!

👉 contact@waitingforcode.com
đź”— past projects


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!