Collect action and determinism

Even though nowadays RDD tends to be a low level abstraction and we should use SQL API, some of its methods are still used under-the-hood by the framework. During one of my explorations I wanted to analyze the task responsible for listing the files to process. Initially, my thought was "oh,it uses collect() and the results will be different every time". However, after looking at the code I saw how wrong I was!

4-day workshop · In-person or online

What would it take for you to trust your Databricks pipelines in production?

A 3-day bug hunt on a 3-person team costs up to €7,200 in lost engineering time. This workshop teaches you to prevent that — unit tests, data tests, and integration tests for PySpark and Databricks Lakeflow, including Spark Declarative Pipelines.

Unit, data & integration tests
Medallion architecture & Lakeflow SDP
Max 10 participants · production-ready templates
See the full curriculum → €7,000 flat fee · cohort of up to 10
Bartosz Konieczny
Bartosz
Konieczny

Before all things, *do not* use collect() if you don't reduce the input dataset to a size supported by the driver. Otherwise, you will inevitably throw an OOM exception. Anyway, if you have to use it, below, you can find what happens.

The call to collect() invokes this function:

 def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }

As you can see, it submits a new "collect" job that will materialize all lines present in the partition (iter.toArray). But this runJob method is just a façade that will delegate the physical execution to more specialized runJob versions. One of these versions explains why the results will be always ordered by the input partition:

class SparkContext(config: SparkConf) extends Logging {
// ...
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int]): Array[U] = {
    val results = new Array[U](partitions.size)
    runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
    results
  }

  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = { 
// ...
}

As you can see here, the execution is delegated once again but this time, Apache Spark passes a function that will apply on the results generated by the func. And this function simply puts the materialized task results to the results array! So, even though the tasks of this collect job completed unordered, the driver at the end will always return them ordered by the task number. Below you will find a short demo showing that:

Of course, collect() can still generate non deterministic results if the mapping function is non deterministic itself (different input in the partitions, side effects). But if for whatever reason the tasks execute in different order and they always generate the same output, then the driver's will always return the same order results in the array.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote one on that topic! You can read it online on the O'Reilly platform, or get a print copy on Amazon.

I also help solve your data engineering problems contact@waitingforcode.com đź“©