ExecutionContext - why one stops and another does not?

on waitingforcode.com

ExecutionContext - why one stops and another does not?

You're doing Java/C#/JavaScript and doing it great? But you're tired because of always facing the same problems. I was like that 4 years ago. I changed then to the data engineering field and it solved my existential problems :) If you want to follow my path, I prepared a course that will help you with that! Join the class!
I didn't notice it before, but if you use the default ExecutionContext in Scala with monads applied on the futures, it will not wait for the async code to terminate. It's not the case of custom ExecutionContext which waits. Strange? Let's see why it happens.

In the first part of the post I will show you the problem in a short video. In the second one, I will explain why apparently the same code snippets behave differently because of the given ExecutionContext.

Problem illustration

The code I'm using in this example is the following:

  def sleepAndReturnInput(input: Seq[Int]): Seq[Int] = {
    val sleepingTime = input.size * 1000
    println(s"Sleeping during ${sleepingTime}")
    Thread.sleep(sleepingTime)
    input
  }

  val resultAbc = Future(sleepAndReturnInput((0 to 3)))
  val resultDefgh = Future(sleepAndReturnInput((0 to 2)))
  val allFutures = Future.sequence(Seq(resultAbc, resultDefgh))

  allFutures.flatMap(allFuturesResults => {
    Future(allFuturesResults.flatten)
  }).foreach(fetchedLetters => {
    println(s"Got mapped numbers=${fetchedLetters}")
  })

For one run I will use the import scala.concurrent.ExecutionContext.Implicits.global and for another the implicit val executionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(2)). You can see in the video below that the first case terminates before the Futures whereas the latter let the async code terminate. Another difference is that the custom context keeps the main thread running, even after terminating the async job:

Daemons

I won't hide you that, I found the answer to my concern in "Difference between ExecutionContext.global and main Thread" question linked in "Read also" section :P And the answer is that the reason comes from different types of threads created by every ExecutionContext. The default one creates daemon threads whereas the custom one user threads. If you use user threads, you have to terminate them explicitly to say to the JVM that the work was done, a little bit like in this snippet:

  val threadPool = Executors.newFixedThreadPool(2)
  implicit val executionContext = ExecutionContext.fromExecutor(threadPool)

// Our async logic


  allFutures.flatMap(allFuturesResults => {
    Future(allFuturesResults.flatten)
  }).foreach(fetchedLetters => {
    println(s"Got mapped numbers=${fetchedLetters}")
  })

  threadPool.shutdown()

The problem is that the code will terminate with a RejectedExecutionException because the background job didn't terminate before the pool shutdown:

java.util.concurrent.RejectedExecutionException: Task scala.concurrent.impl.CallbackRunnable@271cac9d rejected from java.util.concurrent.ThreadPoolExecutor@479915ac[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 2]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)

How I know that?

Apart from checking the answer on StackOverflow, I also confirmed it by analyzing the code. The global ("default") context is created by this call

implicit lazy val global: ExecutionContextExecutor = impl.ExecutionContextImpl.fromExecutor(null: Executor)

And internally, the null executor is delegated further, to this class:

private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: Throwable => Unit) extends ExecutionContextExecutor {
  // Placed here since the creation of the executor needs to read this val
  private[this] val uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = new Thread.UncaughtExceptionHandler {
    def uncaughtException(thread: Thread, cause: Throwable): Unit = reporter(cause)
  }

  val executor: Executor = es match {
    case null => createExecutorService
    case some => some
  }

This createExecutorService method initializes a new context by calling this:

    val threadFactory = new DefaultThreadFactory(daemonic = true)

      new ForkJoinPool(
        desiredParallelism,
        threadFactory,
        uncaughtExceptionHandler,
        true)

  class DefaultThreadFactory(daemonic: Boolean) extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory {
    def wire[T <: Thread](thread: T): T = {
      thread.setDaemon(daemonic)
      thread.setUncaughtExceptionHandler(uncaughtExceptionHandler)
      thread
    }

    def newThread(runnable: Runnable): Thread = wire(new Thread(runnable))

ForkJoinPool uses the DefaultThreadFactory to initialize new threads and as you can see, it passes the "true" daemonic flag to them. So, how to solve it when obviously, there is a need to wait for the background threads to terminate? In different places I found a recommendation to use a dedicated thread pool for blocking I/O operations and let other, more CPU-intensive operations, work on their pool. Keep in mind that any blocking operation like the ones using Await bring the risk of thread starvation if your blocking threads will take a long time and your program's logic is different than a single main with asynchronous code execution (ex. web service) like it was in my snippet.

Read also about ExecutionContext - why one stops and another does not? here: Difference between ExecutionContext.global and main Thread , .

Share on:

Share, like or comment this post on Twitter:

Share, like or comment this post on Facebook: