I had this "aha moment" while I was preparing the blog posts about the shuffle readers. Apache Spark uses iterators a lot! In this blog post you will see the places where I had met them the last months.
Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote
one on that topic! You can read it online
on the O'Reilly platform,
or get a print copy on Amazon.
I also help solve your data engineering problems 👉 contact@waitingforcode.com 📩
Iterators 101
You certainly know what the iterators are about, but let me recall an interesting thing. If you wrap multiple iterators, they will behave exactly like a single iterator! I'm mentioning this here because Apache Spark relies on that fact very often (you will find the example in the last week's blog post about iterators in shuffle reading). If you don't see what I'm talking about, the following example should help:
class ExecutionTimeIterator[A](parentIterator: Iterator[A]) extends Iterator[A] { var firstNextCall: Option[Long] = None var lastNextCall: Option[Long] = None // firstNextCall.get - if we have the last, we also have the first def executionTime = lastNextCall.map(lastCall => lastCall - firstNextCall.get) override def hasNext: Boolean = parentIterator.hasNext override def next(): A = { if (firstNextCall.isEmpty) { firstNextCall = Some(System.currentTimeMillis()) } val nextElement = parentIterator.next() if (!hasNext) { lastNextCall = Some(System.currentTimeMillis()) } nextElement } } val numbers = new ExecutionTimeIterator(Iterator(1, 2, 3)) val mappedNumbersFromWrapper = numbers.map(number => number).toSeq assert(mappedNumbersFromWrapper == Seq(1, 2, 3)) assert(numbers.executionTime.get > 0)
As you can see, in this model we decorated the raw Iterator with an extra code computing the total iteration time. Apache Spark uses the same principle to perform various tasks. Let's see now some of these iterators!
Interrupted task
The first iterator is called InterruptibleIterator. Its name is pretty meaningful because yes, this iterator can be interrupted! What does it mean in the Apache Spark context? Let's check the implementation first:
class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T]) extends Iterator[T] { def hasNext: Boolean = { context.killTaskIfInterrupted() delegate.hasNext } def next(): T = delegate.next() }
The purpose should be clear now. The InterruptibleIterator interrupts the iteration if the task terminated. It can happen with the speculative execution feature where 2 instances of the same task may run in parallel. The idea of wrapping the data iterator (delegate attribute) is to stop generating the results as soon as it doesn't make sense anymore (another instance of the given task already completed).
Completed iteration
Another widely used iterator in Apache Spark codebase is CompletionIterator. The goal? Call a callback once all items processed:
private[spark] abstract class CompletionIterator[ +A, +I <: Iterator[A]](sub: I) extends Iterator[A] { private[this] var completed = false private[this] var iter = sub def next(): A = iter.next() def hasNext: Boolean = { val r = iter.hasNext if (!r && !completed) { completed = true iter = Iterator.empty.asInstanceOf[I] completion() } r } def completion(): Unit
I've first met the CompletionIterator while discovering the internals of stateful operations. It's used there to update state store metrics and to remove expired keys. And generally, housekeeping is its main purpose. It's also used in shuffle blocks fetching to reference the retrieved blocks from memory, and in spillable data structures to release their memory to the memory pool.
Shuffle fetcher
I've already mentioned it but shuffle fetcher is also an iterator. It's called ShuffleBlockFetcherIterator and returns a pair of (BlockId, InputStream) in the next method.
Internally, the next implementation either returns the already fetched blocks directly to the caller, or loads one or more blocks from executors.
Groups generation
Shuffle operation is present in data redistribution scenarios. One of them is group by key operation followed by a mapping function. Apache Spark will use a dedicated iterator called GroupedIterator to generate the groups to the mapping function. It has one requirement, though! The wrapped iterator, represented here as a BufferedIterator[InternalRow] instance, must be pre-sorted.
Why is it required? Because of the hasNext method that only iterates the wrapped iterator and each time it encounters a new grouping key, it creates a new group:
class GroupedIterator private( input: BufferedIterator[InternalRow], groupingExpressions: Seq[Expression], inputSchema: Seq[Attribute]) extends Iterator[(InternalRow, Iterator[InternalRow])] { // ... def hasNext: Boolean = currentIterator != null || fetchNextGroupIterator def next(): (InternalRow, Iterator[InternalRow]) = { assert(hasNext) // Ensure we have fetched the next iterator. val ret = (keyProjection(currentGroup), currentIterator) currentIterator = null ret } private def fetchNextGroupIterator(): Boolean = { assert(currentIterator == null) if (currentRow == null && input.hasNext) { currentRow = input.next() } if (currentRow == null) { false } else { while (keyOrdering.compare(currentGroup, currentRow) == 0 && input.hasNext) { currentRow = input.next() } if (keyOrdering.compare(currentGroup, currentRow) == 0) { false } else { currentGroup = currentRow.copy() currentIterator = createGroupValuesIterator() true } } }
Without a pre-sorted input iterator, the operation would require an extra sorting step before returning the groups to the mapping function.
Aggregations generators
But mapping is not the single operation available on the groups. Another one is aggregation. The first aggregation iterator is called TungstenAggregationIterator and it directly works on UnsafeRows. It uses 2 aggregation modes. The first of them is hash-based, where the iterator puts the groups and their buffers in a hash map structure. However, when it doesn't succeed in acquiring more memory, it falls back to a sort-based version with multiple spillable hash maps and a final merge step of these intermediary spills.
The second aggregation iterator is ObjectAggregationIterator. The difference with the TungstenAggregationIterator is the ability to work on arbitrary JVM objects. And because of that, it has to use a different method to detect the moment to fallback from the hash to sort-based aggregation (number of elements instead of their size which is difficult to estimate for JVM objects).
Spilling
Apache Spark also uses the iterators to spill data on disk. The iterator responsible for that is SpillableIterator defined as follows:
SpillableIterator(var upstream: Iterator[((Int, K), C)])
In addition to the classical next/hasNext methods, SpillableIterator defines a spill() method that will save the in-memory data to the spilling files. But the SpillableIterator only exposes this method. It's the responsibility of the ExternalSorter to trigger the spilling action.
Another type of a spillable iterator, but this time used very closely to the physical node, is SpillableArrayIterator. It's present in ExternalAppendOnlyUnsafeRowArray used to keep joined rows in SortMergeJoinExec.
Records reader
Finally, there is also an iterator to read lines of the input files. I mean here the RecordReaderIterator class that wraps a kind-of-iterator from Hadoop called RecordReader. Officially, it doesn't implement the Iterator interface but behaves as it was one. The RecordReaderIterator has an extra feature to manage resource release when a task reads multiple files. Normally, the release happens when the task completes but Apache Spark optimizes it a bit in the hasNext method:
class RecordReaderIterator[T]( private[this] var rowReader: RecordReader[_, T]) extends Iterator[T] with Closeable { private[this] var havePair = false private[this] var finished = false override def hasNext: Boolean = { if (!finished && !havePair) { finished = !rowReader.nextKeyValue if (finished) { // Close and release the reader here; close() will also be called when the task // completes, but for tasks that read from many files, it helps to release the // resources early. close() } havePair = !finished } !finished }
As you can see in this article, Apache Spark relies on them in many places; shuffle blocks reading, stateful operations, and aggregations. And it does make a lot of sense in big data processing because an iterator, unlike a list or set, doesn't materialize the whole sequence. Instead, it works in a fire-and-forget mode when a returned/read record disappears.
Consulting

With nearly 16 years of experience, including 8 as data engineer, I offer expert consulting to design and optimize scalable data solutions.
As an O’Reilly author, Data+AI Summit speaker, and blogger, I bring cutting-edge insights to modernize infrastructure, build robust pipelines, and
drive data-driven decision-making. Let's transform your data challenges into opportunities—reach out to elevate your data engineering game today!
👉 contact@waitingforcode.com
đź”— past projects