Iterators in Apache Spark

Versions: Apache Spark 3.1.1

I had this "aha moment" while I was preparing the blog posts about the shuffle readers. Apache Spark uses iterators a lot! In this blog post you will see the places where I had met them the last months.

Iterators 101

You certainly know what the iterators are about, but let me recall an interesting thing. If you wrap multiple iterators, they will behave exactly like a single iterator! I'm mentioning this here because Apache Spark relies on that fact very often (you will find the example in the last week's blog post about iterators in shuffle reading). If you don't see what I'm talking about, the following example should help:

  class ExecutionTimeIterator[A](parentIterator: Iterator[A]) extends Iterator[A] {
    var firstNextCall: Option[Long] = None
    var lastNextCall: Option[Long] = None

    // firstNextCall.get - if we have the last, we also have the first
    def executionTime = lastNextCall.map(lastCall => lastCall - firstNextCall.get)

    override def hasNext: Boolean = parentIterator.hasNext
    override def next(): A = {
      if (firstNextCall.isEmpty) {
        firstNextCall = Some(System.currentTimeMillis())
      }
      val nextElement = parentIterator.next()
      if (!hasNext) {
        lastNextCall = Some(System.currentTimeMillis())
      }
      nextElement
    }
  }
  val numbers = new ExecutionTimeIterator(Iterator(1, 2, 3))
  val mappedNumbersFromWrapper = numbers.map(number => number).toSeq
  assert(mappedNumbersFromWrapper == Seq(1, 2, 3))
  assert(numbers.executionTime.get > 0)

As you can see, in this model we decorated the raw Iterator with an extra code computing the total iteration time. Apache Spark uses the same principle to perform various tasks. Let's see now some of these iterators!

Interrupted task

The first iterator is called InterruptibleIterator. Its name is pretty meaningful because yes, this iterator can be interrupted! What does it mean in the Apache Spark context? Let's check the implementation first:

class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T])
  extends Iterator[T] {
  def hasNext: Boolean = {
    context.killTaskIfInterrupted()
    delegate.hasNext
  }

  def next(): T = delegate.next()
}

The purpose should be clear now. The InterruptibleIterator interrupts the iteration if the task terminated. It can happen with the speculative execution feature where 2 instances of the same task may run in parallel. The idea of wrapping the data iterator (delegate attribute) is to stop generating the results as soon as it doesn't make sense anymore (another instance of the given task already completed).

Completed iteration

Another widely used iterator in Apache Spark codebase is CompletionIterator. The goal? Call a callback once all items processed:

private[spark]
abstract class CompletionIterator[ +A, +I <: Iterator[A]](sub: I) extends Iterator[A] {
  private[this] var completed = false
  private[this] var iter = sub
  def next(): A = iter.next()
  def hasNext: Boolean = {
    val r = iter.hasNext
    if (!r && !completed) {
      completed = true
      iter = Iterator.empty.asInstanceOf[I]
      completion()
    }
    r
  }

  def completion(): Unit

I've first met the CompletionIterator while discovering the internals of stateful operations. It's used there to update state store metrics and to remove expired keys. And generally, housekeeping is its main purpose. It's also used in shuffle blocks fetching to reference the retrieved blocks from memory, and in spillable data structures to release their memory to the memory pool.

Shuffle fetcher

I've already mentioned it but shuffle fetcher is also an iterator. It's called ShuffleBlockFetcherIterator and returns a pair of (BlockId, InputStream) in the next method.

Internally, the next implementation either returns the already fetched blocks directly to the caller, or loads one or more blocks from executors.

Groups generation

Shuffle operation is present in data redistribution scenarios. One of them is group by key operation followed by a mapping function. Apache Spark will use a dedicated iterator called GroupedIterator to generate the groups to the mapping function. It has one requirement, though! The wrapped iterator, represented here as a BufferedIterator[InternalRow] instance, must be pre-sorted.

Why is it required? Because of the hasNext method that only iterates the wrapped iterator and each time it encounters a new grouping key, it creates a new group:

class GroupedIterator private(
    input: BufferedIterator[InternalRow],
    groupingExpressions: Seq[Expression],
    inputSchema: Seq[Attribute])
  extends Iterator[(InternalRow, Iterator[InternalRow])] {
// ...
  def hasNext: Boolean = currentIterator != null || fetchNextGroupIterator


  def next(): (InternalRow, Iterator[InternalRow]) = {
    assert(hasNext) // Ensure we have fetched the next iterator.
    val ret = (keyProjection(currentGroup), currentIterator)
    currentIterator = null
    ret
  }

  private def fetchNextGroupIterator(): Boolean = {
    assert(currentIterator == null)

    if (currentRow == null && input.hasNext) {
      currentRow = input.next()
    }

    if (currentRow == null) {
      false
    } else {
      while (keyOrdering.compare(currentGroup, currentRow) == 0 && input.hasNext) {
        currentRow = input.next()
      }

      if (keyOrdering.compare(currentGroup, currentRow) == 0) {
        false
      } else {
        currentGroup = currentRow.copy()
        currentIterator = createGroupValuesIterator()
        true
      }
    }
  }

Without a pre-sorted input iterator, the operation would require an extra sorting step before returning the groups to the mapping function.

Aggregations generators

But mapping is not the single operation available on the groups. Another one is aggregation. The first aggregation iterator is called TungstenAggregationIterator and it directly works on UnsafeRows. It uses 2 aggregation modes. The first of them is hash-based, where the iterator puts the groups and their buffers in a hash map structure. However, when it doesn't succeed in acquiring more memory, it falls back to a sort-based version with multiple spillable hash maps and a final merge step of these intermediary spills.

The second aggregation iterator is ObjectAggregationIterator. The difference with the TungstenAggregationIterator is the ability to work on arbitrary JVM objects. And because of that, it has to use a different method to detect the moment to fallback from the hash to sort-based aggregation (number of elements instead of their size which is difficult to estimate for JVM objects).

Spilling

Apache Spark also uses the iterators to spill data on disk. The iterator responsible for that is SpillableIterator defined as follows:

SpillableIterator(var upstream: Iterator[((Int, K), C)])

In addition to the classical next/hasNext methods, SpillableIterator defines a spill() method that will save the in-memory data to the spilling files. But the SpillableIterator only exposes this method. It's the responsibility of the ExternalSorter to trigger the spilling action.

Another type of a spillable iterator, but this time used very closely to the physical node, is SpillableArrayIterator. It's present in ExternalAppendOnlyUnsafeRowArray used to keep joined rows in SortMergeJoinExec.

Records reader

Finally, there is also an iterator to read lines of the input files. I mean here the RecordReaderIterator class that wraps a kind-of-iterator from Hadoop called RecordReader. Officially, it doesn't implement the Iterator interface but behaves as it was one. The RecordReaderIterator has an extra feature to manage resource release when a task reads multiple files. Normally, the release happens when the task completes but Apache Spark optimizes it a bit in the hasNext method:

class RecordReaderIterator[T](
    private[this] var rowReader: RecordReader[_, T]) extends Iterator[T] with Closeable {
  private[this] var havePair = false
  private[this] var finished = false

  override def hasNext: Boolean = {
    if (!finished && !havePair) {
      finished = !rowReader.nextKeyValue
      if (finished) {
        // Close and release the reader here; close() will also be called when the task
        // completes, but for tasks that read from many files, it helps to release the
        // resources early.
        close()
      }
      havePair = !finished
    }
    !finished
  }

As you can see in this article, Apache Spark relies on them in many places; shuffle blocks reading, stateful operations, and aggregations. And it does make a lot of sense in big data processing because an iterator, unlike a list or set, doesn't materialize the whole sequence. Instead, it works in a fire-and-forget mode when a returned/read record disappears.

If you liked it, you should read:

The comments are moderated. I publish them when I answer, so don't worry if you don't see yours immediately :)

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!