Work-stealing in Scala

Versions: Scala 2.12.1

When I was reading about the Await implementation in Scala, I found a method called blocking. At that time I've read some articles to understand it but I hadn't a chance to play with it. Now it's the case and I will share my findings with you.

I will start this post by talking about a Java class used to control blocking. The reason why it's the first point is not an accident and you'll see why in the second section. In the third one I'll try to show you more detailed information about the blocking method and the related points.

Work-stealing example

Let's begin with a simple question. What do you think, how many operations will run in parallel for the following code ?

import scala.concurrent.ExecutionContext.Implicits.global
val threadIdsNotBlocking = new java.util.concurrent.CopyOnWriteArrayList[Long]()
(1 to 100) foreach { _ =>
  Future {
    threadIdsNotBlocking.add(Thread.currentThread().getId)
    Thread.sleep(3000)
  }
}
Thread.sleep(1000)
println(threadIdsNotBlocking.size)

If you answered "as many as CPUs", you were right. The parallelism depends on the number of cores and that's not new. That means that even though you create 100 threads, the server won't run them all simultaneously. Fortunately, you can improve that by using the technique called work-stealing. This technique consists of temporary switches between blocked threads in order to optimize the number of parallel executions.

The same code as above but written with work-stealing enabled, could look like in this snippet:

import scala.concurrent.ExecutionContext.Implicits.global
  "blocking" should "give a hint to the ExecutionContext to create threads when already created are blocked" in {
    val threadIdsFromBlocking = new java.util.concurrent.CopyOnWriteArrayList[Long]()
    (1 to 100) foreach { _ =>
      Future {
        threadIdsFromBlocking.add(Thread.currentThread().getId)
        blocking {
          Thread.sleep(3000)
        }
      }
    }
    Thread.sleep(1000)
    println(threadIdsFromBlocking.size)
    (threadIdsFromBlocking.size > Runtime.getRuntime().availableProcessors()) shouldBe true
  }

As you can see, the size of the list was greater than the number of CPUs. How does it possible? After all, the code is slightly the same. The only difference is the blocking method. In the next sections, I'll show you what happens because of it.

Fork/join pool

For the code from our example Scala executes the most of operations on top of Java's java.util.concurrent package classes. It's also the case for scala.concurrent.ExecutionContext.Implicits.global imported and used in both test cases. This context object is backed by Java's fork/join thread pool. The singularity of this pool is work-stealing support. That should then mean that we shouldn't have any difference between the 2 executed tests. But it's not the case and that is why (official Scala documentation for Futures and Promises):

The number of concurrently blocking computations can exceed the parallelism level only if each blocking call is wrapped inside a blocking call (more on that below). Otherwise, there is a risk that the thread pool in the global execution context is starved, and no computation can proceed.

However, remember that it applies only to the fork/join pool. If you're working with different thread pools like the pool with the fixed number of threads, the blocking method won't have any effect:

  "blocking" should "not have an adjustment effect for fixed thread pool" in {
    val poolThreads = 2
    val executorService = Executors.newFixedThreadPool(poolThreads)
    val executionContext = ExecutionContext.fromExecutorService(executorService)
    val threadIdsFromBlocking = new java.util.concurrent.CopyOnWriteArrayList[Long]()
    (1 to 100) foreach { _ =>
      Future {
        threadIdsFromBlocking.add(Thread.currentThread().getId)
        blocking {
          Thread.sleep(3000)
        }
      }(executionContext)
    }
    Thread.sleep(1000)

    threadIdsFromBlocking.size shouldEqual poolThreads
  }

Work-stealing details

From our previous examples we can deduce the 2 main use cases of blocking method:

  1. The runtime behavior adjustment - as you saw, with the blocking method we could start much more asynchronous computations than without. The examples were quite straightforward but you can imagine the situation where half of the operations will be fast and another half will be slow. In such a case, even if the executor starts with the long ones, it'll pretty quick switch to the short ones and reduce the overall computation time. Without the work-stealing, the execution time would be equal to slow task execution time + fast task execution time.
  2. The thread starvation prevention - the second feature is about the thread starvation. It happens when some of the concurrent operations aren't executed because of the unavailable resources. It can happen when one or more concurrent tasks don't release the threads back to the pool.

Under-the-hood, the work-stealing uses the ForkJoinPool, created automatically for the default ExecutionContext. When it's initialized, it creates worker threads with ExecutionContextImpl.DefaultThreadFactory newThread(fjp: ForkJoinPool) method. The workers are registered and ready to be used before the first calls of the asynchronous code.

The workers are not a pure Thread objects because they're mixed with BlockContext trait declaring blockOn method. Now, when a new asynchronous code is wrapped with scala.concurrent.blocking method, it calls:

def blocking[T](body: =>T): T = BlockContext.current.blockOn(body)(scala.concurrent.AwaitPermission)

The instance of BlockContext.current is the thread worker created during the initialization of the ForkJoinPool. And that's where most of the magic happens. Inside the blockOn method, Scala creates an instance of Forkjoin.ManagedBlocker class and submits it into ForkJoinPool.managedBlock method. This method checks whether the blocking operation terminated by analyzing the responses of ManagedBlocker's isReleasable() and block() methods. If one of them returns true, the ForkJoinPool stops to execute the asynchronous operation. If it's not the case, it eventually tries to execute it if there are some available resources.

That means that the scala.concurrent.blocking is a simple delegator of the asynchronous execution to the underlying BlockContext. If the context supports work-stealing, it's able to run the tasks in such a way. Otherwise, as we saw with the case of the fixed thread pool, it doesn't support it even though we use the blocking wrapper.

Gotchas

However, the blocking doesn't come without costs. Colin Breck showed the potential problems in his post Calling Blocking Code: There Is No Free Lunch. In the post, you can see that one of the last examples ends with an OOM. Even though I didn't succeed to reproduce the failure, the point about memory is important to keep in mind.

Another important point is that the work-stealing purpose. If you wrap the really blocking code like a database connection, synchronous API call and so forth, it makes sense to use the work-stealing. On the other side, if you don't have any blocking call, adding an extra overhead may be costly. I made a simple JMH micro-benchmark to see the impact on blocking vs not blocking on the asynchronous code:

import scala.concurrent.ExecutionContext.Implicits.global

@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Array(Mode.All))
class BlockingMicroBenchmark {
  @Benchmark
  def verify_blocking: Unit = {
    val latch = new CountDownLatch(100)
    (1 to 100) foreach { _ =>
      Future {
        blocking {
        }
        latch.countDown()
      }
    }
    latch.await()
  }

  @Benchmark
  def verify_not_blocking: Unit = {
    val latch = new CountDownLatch(100)
    (1 to 100) foreach { _ =>
      Future {
        latch.countDown()
      }
    }
    latch.await()
  }

}

After executing the code with jmh:run -i 20 -wi 10 -f1 -t1 -rf text, we can see that the results are much better for the code without blocking. It has much better throughput, a lower average execution time and the worst execution scenario. Below you can find the summary of that test:

Benchmark                      Mode       Cnt       Score   Error     Units
verify_blocking                          thrpt       20       0.098  ± 0.008   ops/ms
verify_not_blocking          thrpt        20       20.286 ± 1.263  ops/ms
verify_blocking                          avgt        20       12.241 ± 1.094  ms/op
verify_not_blocking          avgt        20       0.046  ± 0.001   ms/op
verify_blocking                          sample  1602     12.591 ± 0.627  ms/op
verify_blocking:p0.00        sample             0.111               ms/op
verify_blocking:p0.50        sample             12.132               ms/op
verify_blocking:p0.90        sample             23.442               ms/op
verify_blocking:p0.95        sample             26.378              ms/op
verify_blocking:p0.99        sample             32.406              ms/op
verify_blocking:p0.999       sample             54.001              ms/op
verify_blocking:p0.9999      sample             54.198               ms/op
verify_blocking:p1.00        sample             54.198             ms/op
verify_not_blocking          sample  2599  0.060  ± 0.002   ms/op
verify_not_blocking:p0.00    sample            0.015             ms/op
verify_not_blocking:p0.50    sample            0.040             ms/op
verify_not_blocking:p0.90    sample            0.081             ms/op
verify_not_blocking:p0.95    sample            0.112             ms/op
verify_not_blocking:p0.99    sample            0.314             ms/op
verify_not_blocking:p0.999    sample            2.179             ms/op
verify_not_blocking:p0.9999  sample            9.847             ms/op
verify_not_blocking:p1.00    sample            47.055             ms/op
verify_blocking                ss         20      16.377 ± 9.39    ms/op
verify_not_blocking          ss         20      0.295  ± 0.136   ms/op

The work-stealing is an interesting point to improve the responsiveness of asynchronous code. It enables us to better optimize the resources use in the case when one or more of asynchronous operations are blocking. In Scala we can use blocking() method in order to give a hint to the execution pool about the risks of thread starvation. In that situation, the pool can either do nothing or it can simply put the blocked operation into a queue and try to start another one. It should optimize the execution time but not always. In the tests from the last section, you can see that the blocking() must be really used when the given operation has something blocking. Otherwise, it can have the opposite effect and slow down the code execution.