ExecutionContext - why one stops and another does not?

Versions: Scala 2.12.1

I didn't notice it before, but if you use the default ExecutionContext in Scala with monads applied on the futures, it will not wait for the async code to terminate. It's not the case of custom ExecutionContext which waits. Strange? Let's see why it happens.

New ebook 🔥

Learn 84 ways to solve common data engineering problems with cloud services.

👉 I want my Early Access edition

In the first part of the post I will show you the problem in a short video. In the second one I will explain why, apparently the same code snippets behave differently because of the given ExecutionContext.

Problem illustration

The code I'm using in this example is the following:

  def sleepAndReturnInput(input: Seq[Int]): Seq[Int] = {
    val sleepingTime = input.size * 1000
    println(s"Sleeping during ${sleepingTime}")
    Thread.sleep(sleepingTime)
    input
  }

  val resultAbc = Future(sleepAndReturnInput((0 to 3)))
  val resultDefgh = Future(sleepAndReturnInput((0 to 2)))
  val allFutures = Future.sequence(Seq(resultAbc, resultDefgh))

  allFutures.flatMap(allFuturesResults => {
    Future(allFuturesResults.flatten)
  }).foreach(fetchedLetters => {
    println(s"Got mapped numbers=${fetchedLetters}")
  })

For one run I will use the import scala.concurrent.ExecutionContext.Implicits.global and for another the implicit val executionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(2)). You can see in the video below that the first case terminates before the Futures whereas the latter let the async code terminate. Another difference is that the custom context keeps the main thread running, even after terminating the async job:

Daemons

I won't hide you, I found the answer to my concern in "Difference between ExecutionContext.global and main Thread" question linked in "Read also" section :P And the answer is that the reason comes from different types of threads created by every ExecutionContext. The default one creates daemon threads whereas the custom one user threads. If you use user threads, you have to terminate them explicitly to say to the JVM that the work was done, a little bit like in this snippet:

  val threadPool = Executors.newFixedThreadPool(2)
  implicit val executionContext = ExecutionContext.fromExecutor(threadPool)

// Our async logic


  allFutures.flatMap(allFuturesResults => {
    Future(allFuturesResults.flatten)
  }).foreach(fetchedLetters => {
    println(s"Got mapped numbers=${fetchedLetters}")
  })

  threadPool.shutdown()

The problem is that the code will terminate with a RejectedExecutionException because the background job didn't terminate before the pool shutdown:

java.util.concurrent.RejectedExecutionException: Task scala.concurrent.impl.CallbackRunnable@271cac9d rejected from java.util.concurrent.ThreadPoolExecutor@479915ac[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 2]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)

How I know that?

Apart from checking the answer on StackOverflow, I also confirmed it by analyzing the code. The global ("default") context is created by this call

implicit lazy val global: ExecutionContextExecutor = impl.ExecutionContextImpl.fromExecutor(null: Executor)

And internally, the null executor is delegated further, to this class:

private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: Throwable => Unit) extends ExecutionContextExecutor {
  // Placed here since the creation of the executor needs to read this val
  private[this] val uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = new Thread.UncaughtExceptionHandler {
    def uncaughtException(thread: Thread, cause: Throwable): Unit = reporter(cause)
  }

  val executor: Executor = es match {
    case null => createExecutorService
    case some => some
  }

This createExecutorService method initializes a new context by calling this:

    val threadFactory = new DefaultThreadFactory(daemonic = true)

      new ForkJoinPool(
        desiredParallelism,
        threadFactory,
        uncaughtExceptionHandler,
        true)

  class DefaultThreadFactory(daemonic: Boolean) extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory {
    def wire[T <: Thread](thread: T): T = {
      thread.setDaemon(daemonic)
      thread.setUncaughtExceptionHandler(uncaughtExceptionHandler)
      thread
    }

    def newThread(runnable: Runnable): Thread = wire(new Thread(runnable))

ForkJoinPool uses the DefaultThreadFactory to initialize new threads and as you can see, it passes the "true" daemonic flag to them. So, how to solve it when obviously, there is a need to wait for the background threads to terminate? In different places I found a recommendation to use a dedicated thread pool for blocking I/O operations and let other, more CPU-intensive operations, work on their own pool. Keep in mind that any blocking operation like the ones using Await bring the risk of thread starvation if your blocking threads will take a really long time and your program's logic is different than a single main with asynchronous code execution (ex. web service) like it was in my snippet.