Stream safely in Scala

on waitingforcode.com

Stream safely in Scala

You're still doing Java/C#/JavaScript/Python/PHP... and need a wind of change? I was like that 4 years ago. I changed then to the data engineering field and it solved my existential problems :) If you want to follow my path, I prepared a course that will help you with that! Join the class!
Scala Stream offers something we have not a habit to see in other languages - lazy computation of the values alongside the memoization. However it's sometimes misleading and some people think about Streams as about iterators, i.e. a data structure computing and forgetting about the results. Such thinking can often lead to memory problems, especially with infinite streams.

In this post I'll try to show some important aspects about Scala Streams. Its first part focuses on the basics of that data structure. The second section shows how to use it safely, i.e. without creating a big risk of OOM errors.

Creating a Stream

Stream is a functional data structure that at first glance looks very similar to the iterators. After all, everything it does is the computation of elements one by one, at demand. But it's only a superficial resemblance. Streams aren't iterators because they don't forget the generated values. Instead, they persist them - we say that they memoize the data. Thanks to that, when given Stream is used once again, it doesn't need to be recomputed.

The concepts of reuse and memoization are often misunderstood by Scala learners who think that stream is only about the data generation at demand. This ignorance often leads to unexpected OOM problems because the stream can be infinite.

If you're wondering about the difference between a stream and an iterator, you can look at the following test case:

  describe("Stream basic behavior") {
    it("should generate even numbers from a Stream") {
      val evenNumbersCreationTimes = new mutable.ListBuffer[Long]()
      // Please note you can also create an infinite even-numbers stream with `Stream.from(0, 2)`
      val evenNumbersStream = (0 until 6).toStream.filter(nr => {
        val isEven = nr % 2 == 0
        if (isEven) evenNumbersCreationTimes.append(System.currentTimeMillis())
        isEven
      })

      for (_ <- evenNumbersStream) {
        // consume stream with 2 seconds interval
        Thread.sleep(2000L)
      }

      for (index <- 1 until evenNumbersCreationTimes.size) {
        val previousTime = evenNumbersCreationTimes(index-1)
        val currentTime = evenNumbersCreationTimes(index)

        assert(currentTime - previousTime >= 2000L, "The difference between Stream generation time should be at least 2 seconds")
      }
      // Here we assert that the Stream is memoized, i.e. it's not recomputed
      for (_ <- evenNumbersStream) {}
      evenNumbersCreationTimes should have size 3
    }

    it("should show the difference from an Iterator") {
      val evenNumbersCreationTimes = new mutable.ListBuffer[Long]()
      // Below you can find a sample iterator. As you can note, it creates one item at once we can pretty
      // easily control the process. It's a little bit more complicated for Streams
      val limit = 6
      val evenNumbersIterator = new Iterator[Int] {

        var currentNumber = 0

        override def hasNext: Boolean = currentNumber < limit

        override def next(): Int = {
          currentNumber += 2
          currentNumber
        }
      }

      val evenNumbersMaterialized = evenNumbersIterator.map(nr => {
        Thread.sleep(2000L)
        nr
      })mkString(", ")
      evenNumbersMaterialized shouldEqual "2, 4, 6"
      for (index <- 1 until evenNumbersCreationTimes.size) {
        val previousTime = evenNumbersCreationTimes(index-1)
        val currentTime = evenNumbersCreationTimes(index)

        assert(currentTime - previousTime >= 2000L, "The difference between Iterator generation time should be at least 2 seconds")
      }
      val evenNumbersMaterializedNextIteration = evenNumbersIterator.mkString(", ")
      evenNumbersMaterializedNextIteration shouldBe empty
    }
}

As you can see, Iterator and Stream are both able to generate one item at a time. It's pretty clear with the assertion on the generation difference of at least 2 seconds. On the other side, the stream's data is memoized and available for more than 1 iteration - something which is not the case of an iterator. The memoization is dangerous because the stream can run forever and lead to an OOM. To see that problem, you can execute the following code with -Xmx100m JVM argument:

    it("should fail with OOM") {
      val numbersStream = Stream.from(1)
      for (nr <- numbersStream) {
      }
    }

The code will fail after 1 minute with the following exception:

An exception or error caused a run to abort: GC overhead limit exceeded
java.lang.OutOfMemoryError: GC overhead limit exceeded
    at scala.collection.immutable.Stream$.from(Stream.scala:1151)
    at scala.collection.immutable.Stream$.$anonfun$from$1(Stream.scala:1219)
    at scala.collection.immutable.Stream$$$Lambda$143/1997963191.apply(Unknown Source)
    at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1169)
    at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1159)
    at scala.collection.immutable.Stream.foreach(Stream.scala:531)
    at StreamSpec.$anonfun$new$10(StreamSpec.scala:65)
    at StreamSpec$$Lambda$81/1239548589.apply$mcV$sp(Unknown Source)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)

Fortunately, if we respect some specific rules, we can avoid OOM issues. Let's go to the next section to discover them.

Avoiding OOM errors

Stream in Scala is composed of the head and a function which creates the remaining values (tail). You can see that when you call Stream.from(0, 2) method to generate a stream with even numbers:

  def from(start: Int, step: Int): Stream[Int] =
    cons(start, from(start+step, step))

As you can see, the Stream is built from cons operator. And it's the case also for the instructs like (0 to 6).toStream:

  def toStream: Stream[A] =
    if (self.hasNext) Stream.cons(self.next(), self.toStream)
    else Stream.empty[A]

Hence, the stream creates a structure similar to an immutable list because once memoized, values cannot be modified. In addition, the list is kept in memory as long as there exists a reference to its head. That means that one of the solutions to avoid the problems with memory consists on getting rid of the head as soon as possible and not materializing the stream. To achieve that we can use create the stream with a tail-recursive function and use it as a not referenced object. (= def instead of val). The first part of this rule is very important. Otherwise, there will be always something keeping the reference to the Stream's head and the solution will simply not work. Let's rewrite our previous failing example with this method:

    it("should not fail when tail-recursive function and def-based Stream are used") {
      def numbersStream: Stream[Int] = {
        def createAndConsumeStreamHead(nr: Int): Stream[Int] = nr #:: createAndConsumeStreamHead(nr + 1)
        createAndConsumeStreamHead(0)
      }
      for (nr <- numbersStream) {
      }
    }

I let it running on my local environment and it didn't fail after 5 minutes. As you can see, the solution uses the recursion that consumes the first element of the stream and creates a new one from the defined argument. However, the readability of this solution is very doubtful. It's much less readable than the similar using an iterator. Both do the same but the iterator's next and hasNext functions speak for themselves more than Stream's recursivity.

As proven in this post, the streams can often be misunderstood and used interchangeably with iterators. But they are different because the lazily computed values are memoized. It means that without a careful construction, like the one using the tail-recursive function, it may lead to OOM errors.

Share on:

Share, like or comment this post on Twitter: