With increasing number of computation power, the parallel computing gained the popularity during the last years. Java's concurrent package is one of the proofs for that. But Scala, even though it's able to work with Java's concurrent features, comes also with its own mechanisms. Futures are one of them.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in 👉
Early Release on the O'Reilly platform
I also help solve your data engineering problems 👉 contact@waitingforcode.com 📩
This post describes Future objects in 4 parts. The first one gives some basic information about these objects. The second one goes deeper and presents more advanced features. The third part focuses on timeouts and errors. The last section tries to answer the question about blocking the Futures.
Introduction to Futures
As in many other languages, Scala Future is a placeholder representing a value that will be computed soon. It's immutable - once computed it can't be overridden - and typed. The type represents the value returned by the Future. A Future can be terminated or not and the termination is either successful or in failure. And to make a Future work, we must defined an ExecutionContext . Most often importing import scala.concurrent.ExecutionContext.Implicits.global should be enough. We can construct it with one of its convenient methods:
Future.apply({ "XYZ" }) // or Future.apply(....)(customExecutionContext) because of def apply[T](body: =>T)(implicit @deprecatedName('execctx) executor: ExecutionContext) Future.successful("ababab") Future.failed(new RuntimeException("Test error"))
The first line represents a Future those result we don't know. The second one is used to return a Future executed correctly while the last for a failed Future. The computation defined in apply(...) method executes in asynchronous, thus non-blocking, way. Many different ways exist to get the computation result. One of the simplest one is the use of onComplete callback:
private def getFutureString(waitingTime: Long): String = { Thread.sleep(waitingTime) "xyz" } it("should retrieve Future value from callback") { val accumulator = new mutable.ListBuffer[String]() val futureString = Future(getFutureString(2000L)) futureString onComplete { case Success(text) => accumulator.append(text) case Failure(_) => throw new RuntimeException("Retrieval failure") } Await.result(futureString, 3 seconds) accumulator should have size 1 accumulator(0) shouldEqual "xyz" } it("should call callback even when it's defined after waiting method") { val future = Future(getFutureString(2500L)) Await.result(future, 5 seconds) var futureResult = "" future.onComplete { case Success(_) => futureResult = "OK" case Failure(_) => futureResult = "ERROR" } // race condition, Future is executed in other thread futureResult shouldEqual "" Thread.sleep(2000) futureResult shouldEqual "OK" } it("should not callback when the Future takes more time to execute than the main thread") { val future = Future(getFutureString(2500L)) var futureResult = "" future.onComplete { case Success(_) => futureResult = "OK" case Failure(_) => futureResult = "ERROR" } futureResult shouldEqual "" }
Other, more Scala-like ways to manipulate Future results use combinators. One of them is map function known from sequences:
it("should map future results to another type") { def getNumber(numberToReturn: Int): Int = numberToReturn val futures = Future.sequence(Seq( Future(getNumber(1)), Future(getNumber(2)), Future(getNumber(3)) )) Await.result(futures, 3 seconds) val stringifiedNumbers = futures.map(numbers => numbers.map(nr => s"number=${nr}")) val mappedNumbers = Await.result(stringifiedNumbers, 1 second) mappedNumbers should have size 3 mappedNumbers should contain allOf("number=1", "number=2", "number=3") }
As you can see, the result of mapped Future is a Future of the same or different type. Map rarely goes without flatMap and it applies to Future too:
it("should use flatMap to 2 dependent Futures") { val dataGetterFuture = Future.apply({ "XYZ" }) def textValidatorFuture(word: String, minLength: Int): Future[Boolean] = Future({ word.length > minLength }) val validationResult = dataGetterFuture.flatMap(text => textValidatorFuture(text, 2)) val isValidText = Await.result(validationResult, Duration.Inf) isValidText shouldBe true }
The difference between flatMap and map is slight. The mapping function of flatMap must explicitly return a Future while for map it must return the value of the Future. But it also helps to reduce the verbosity of dependent Futures. If the result of one of them depends on the result of another, using a map would produce an instance of type Future[Future[T]] while for flatMap we're able to retrieve directly a Future[T]. Obviously, it's easier to work with a flattened version. The same test as above rewritten with map would look like:
it("should use map on 2 dependent Futures") { val dataGetterFuture = Future.apply({ "XYZ" }) def textValidatorFuture(word: String, minLength: Int): Future[Boolean] = Future({ word.length > minLength }) val validationResult: Future[Future[Boolean]] = dataGetterFuture.map(text => textValidatorFuture(text, 2)) val isValidTextFuture = Await.result(validationResult, Duration.Inf) val isValidText = Await.result(isValidTextFuture, Duration.Inf) isValidText shouldBe true }
Advanced Futures
Hopefully map and flatMap aren't single available combinators. A Future can also:
- be filtered
it("should fail when a failed Future is used") { val failedFuture: Future[String] = Future.successful("ababab") val textsStartingWithA = failedFuture.withFilter(text => text.length > 10) val noSuchElementException = intercept[NoSuchElementException] { Await.result(textsStartingWithA, 5 seconds) } noSuchElementException.getMessage shouldEqual "Future.filter predicate is not satisfied" }
- aggregate results of different Futures
it("should reduce the futures") { val futureText1: Future[String] = Future.successful("1") val futureText2: Future[String] = Future.successful("2") val futureText3: Future[String] = Future.successful("3") val reducedFutures = Future.reduceLeft[String, String](List(futureText1, futureText2, futureText3)) { case (previousResult, futureResult) => previousResult + futureResult } val result = Await.result(reducedFutures, 1 second) result.split("") should contain allOf("1", "2", "3") }
- apply side-effects after completion
it("should chain Futures with andThen combinator and always return the result of the first Future") { val sourceFuture = Future(getFutureString(1000L)) val numbers = mutable.ListBuffer[Int](1, 2, 3) // purely for side-effecting purposes val resultFuture = sourceFuture andThen { case Success(_) => { // Here we can apply a side effect numbers.clear "def" } } andThen { case Success(_) => "abc" } val result = Await.result(resultFuture, 2 seconds) result shouldEqual "xyz" numbers shouldBe empty }
- compose multiple Futures and be processed as a single Future
it("should chain the list of futures") { val future2SecWait = Future(getFutureString(2000L)) val future3SecWait = Future(getFutureString(3000L)) val future4SecWait = Future(getFutureString(4000L)) val futures = Future.sequence(Seq(future2SecWait, future3SecWait, future4SecWait)) val futuresResult = Await.result(futures, 6 seconds) val concatenatedResult = futuresResult.mkString(", ") concatenatedResult shouldEqual "xyz, xyz, xyz" }
An interesting point is also the possibility to call an action on the first completed Future:
it("should invoke callback at first completed future") { val resultWith3 = Future({ Thread.sleep(3000L) 3 }) val resultWith4 = Future({ Thread.sleep(4000L) 4 }) val resultWith5 = Future({ Thread.sleep(5000L) 5 }) val firstResult = Future.firstCompletedOf(Seq(resultWith3, resultWith4, resultWith5)) val number = Await.result(firstResult, Duration.Inf) number shouldEqual 3 }
In the bullet list I shown that we can chain different Futures. But we can also chain their results with a for-comprehension:
it("should compose multiple futures with for comprehension") { val future2SecWait = Future(getFutureString(2000L)) val future3SecWait = Future(getFutureString(3000L)) val future4SecWait = Future(getFutureString(4000L)) val concatenatedFuture = for { txt1 <- future2SecWait txt2 <- future3SecWait txt3 <- future4SecWait } yield { txt1 + txt2 + txt3 } val concatenatedResult = Await.result(concatenatedFuture, 6 seconds) concatenatedResult shouldEqual "xyzxyzxyz" }
Another interesting future of Futures is the recovery. When a Future fails to complete we can use recovery or recoveryWith method to compute an alternative results for given Future:
it("should recover from failed Future") { val failedFuture: Future[String] = Future.failed(new RuntimeException("Test error")) val recoveredFailure = failedFuture.recover { case re: RuntimeException => re.getMessage } val result = Await.result(recoveredFailure, 1 second) result shouldEqual "Test error" } it("should keep the result of successful Future when recover is called") { val successfulResult = Future.successful("OK") val recoveredFuture = successfulResult.recover { case _ => "ERROR" } val result = Await.result(recoveredFuture, 1 second) result shouldEqual "OK" } it("should show the difference between recover and recoverWith") { val failedFuture: Future[String] = Future.failed(new RuntimeException("Test error")) val recoveredFailureFromRecover = failedFuture.recover { case re: RuntimeException => s"recover: ${re.getMessage}" } val recoveredFailureFromRecoverWith = failedFuture.recoverWith { case re: RuntimeException => Future.successful(s"recoverWith: ${re.getMessage}") } val resultForRecover = Await.result(recoveredFailureFromRecover, 1 second) val resultForRecoverWith = Await.result(recoveredFailureFromRecoverWith, 1 second) resultForRecover shouldEqual "recover: Test error" resultForRecoverWith shouldEqual "recoverWith: Test error" } it("should not recover from not handled Exception") { val failedFuture: Future[String] = Future.failed(new IllegalArgumentException("Test error")) val recoveredFailure = failedFuture.recover { case re: IllegalStateException => re.getMessage } val result = intercept[IllegalArgumentException] { Await.result(recoveredFailure, 1 second) } result shouldBe a [IllegalArgumentException] }
Timeouts and errors
One of dangers of asynchronous computation is time. In order to do something with the results we often must block the main thread. But what happens if a Future doesn't complete within this blocking time ? In the simplest case, the Future throws a TimeoutException:
it("should timeout after too small waiting time") { val future = Future(getFutureString(3000L)) val timeoutException = intercept[TimeoutException] { Await.result(future, 1 seconds) } timeoutException.getMessage shouldEqual "Futures timed out after [1 second]" }
When we're working with a set of Futures, then the first timeouted Future throws a TimeoutException:
it("should timeout when one of Futures takes more time than others") { val futures = Future.sequence(Seq( Future(getFutureString(2000L)), Future(getFutureString(5000L)), Future(getFutureString(4000L)) )) val timeoutException = intercept[TimeoutException] { Await.result(futures, 3 seconds) } timeoutException.getMessage shouldEqual "Futures timed out after [3 seconds]" } it("should fail when one of Futures doesn't complete within waiting time") { val future2SecWait = Future(getFutureString(2000L)) val future3SecWait = Future(getFutureString(3000L)) val future4SecWait = Future(getFutureString(4000L)) val concatenatedFuture = for { txt1 <- future2SecWait txt2 <- future3SecWait txt3 <- future4SecWait } yield { txt1 + txt2 + txt3 } val timeoutException = intercept[TimeoutException] { Await.result(concatenatedFuture, 2 seconds) } timeoutException.getMessage shouldEqual "Futures timed out after [2 seconds]" }
And obviously, when the computation of any Future fails, the exception is passed to the main thread:
it("should not wait until the failed future terminates") { def failWithSleeping(sleepingTime: Long) = { Thread.sleep(sleepingTime) throw new RuntimeException("Controlled runtime error") } val failingFuture = Future(failWithSleeping(2000L)) val runtimeError = intercept[RuntimeException] { Await.result(failingFuture, Duration.Inf) } runtimeError.getMessage shouldEqual "Controlled runtime error" }
To block or not to block ?
Officially blocking of Futures is discouraged:
Blocking outside the Future As mentioned earlier, blocking on a future is strongly discouraged for the sake of performance and for the prevention of deadlocks. Callbacks and combinators on futures are a preferred way to use their results. However, blocking may be necessary for certain situations and is supported by the Futures and Promises API. Source: https://docs.scala-lang.org/overviews/core/futures.html
As stated, if we need the result of a Future in a computation of the main thread, we have no choice - we must block. We can do that with Await result and ready methods:
it("should show a difference between result and ready") { val successfulFuture = Future.successful("OK") val failedFuture = Future.failed(new RuntimeException("Test RuntimeException")) val successFutureReady = Await.ready(successfulFuture, 10 seconds) val failedFutureReady = Await.ready(failedFuture, 10 seconds) // Here we retrieve a Future object, thus either Success or Failure successFutureReady shouldEqual successfulFuture failedFutureReady shouldEqual failedFuture val successFutureResult = Await.result(successfulFuture, 10 seconds) val runtimeException = intercept[RuntimeException] { Await.result(failedFuture, 10 seconds) } // For the case of .result we retrieve Future results successFutureResult shouldEqual "OK" runtimeException.getMessage shouldEqual "Test RuntimeException" }
Await methods take an Awaitable object (Future is one of them) and blocks the thread during the time defined as the second argument. It's pretty fine to use blocking in that context. It's considered as bad practice only when we can substitute it with one of the already discussed combinators. For instance, we should rewrite the following snippet with flatMap function:
it("should show bad use of Await method") { val future1 = Future.successful(1) val future2 = Future.successful(2) val resultFuture1 = Await.result(future1, 2 seconds) val resultFuture2 = Await.result(future2, 2 seconds) (resultFuture1 + resultFuture2) shouldEqual 3 }
The same code can be expressed with reduceLeft like this:
it("should show flatMap and only 1 blocking method") { val future1 = Future.successful(1) val future2 = Future.successful(2) val reducedFutures = Future.reduceLeft[Int, Int](List(future1, future2)) { case (previousResult, futureResult) => previousResult + futureResult } val resultFuture = Await.result(reducedFutures, 2 seconds) resultFuture shouldEqual 3 }
Scala Futures are a convenient method to deal with the values computed asynchronously. This placeholder represents an eventually missing value at the given moment that will be provided in the future. It comes with a lot of handful methods to retrieve their results that we can find in other monads: map, flatMap, reduceLeft. We can also use a for-comprehension to handle the execution of multiple Futures in a single place. As advised in the documentation, the Futures shouldn't be blocked - unless they provide an important information for the main thread. Instead, they should use already quoted combinators.