Flaky onSuccess of Future.sequence

2020-02-12 17:18发布

问题:

I wrote this method:

import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.util.{ Success, Failure }

object FuturesSequence extends App {
  val f1 = future {
    1
  }

  val f2 = future {
    2
  }

  val lf = List(f1, f2)

  val seq = Future.sequence(lf)

  seq.onSuccess {
    case l => println(l)
  }
}

I was expecting Future.sequence to gather a List[Future] into a Future[List] and then wait for every futures (f1 and f2 in my case) to complete before calling onSuccess on the Future[List] seq in my case.

But after many runs of this code, it prints "List(1, 2)" only once in a while and I can't figure out why it does not work as expected.

回答1:

Try this for once,

import scala.concurrent._
import java.util.concurrent.Executors
import scala.util.{ Success, Failure }

object FuturesSequence extends App {
  implicit val exec = ExecutionContext.fromExecutor(Executors.newCachedThreadPool)
  val f1 = future {
    1
  }

  val f2 = future {
    2
  }

  val lf = List(f1, f2)

  val seq = Future.sequence(lf)

  seq.onSuccess {
    case l => println(l)
  }
}

This will always print List(1,2). The reason is simple, the exec above is an ExecutionContext of threads (not daemon threads) where as in your example the ExecutionContext was the default one implicitly taken from ExecutionContext.Implicits.global which contains daemon threads.

Hence being daemon, the process doesn't wait for seq future to be completed and terminates. if at all seq does get completed then it prints. But that doesn't happen always



回答2:

The application is exiting before the future is completes.

You need to block until the future has completed. This can be achieved in a variety of ways, including changing the ExecutionContext, instantiating a new ThreadPool, Thread.sleep etc, or by using methods on scala.concurrent.Await

The simplest way for your code is by using Await.ready. This blocks on a future for a specified amount of time. In the modified code below, the application waits for 5 seconds before exiting.

Note also, the extra import scala.concurrent.duration so we can specify the time to wait.

import scala.concurrent._
import scala.concurrent.duration._
import java.util.concurrent.Executors
import scala.util.{ Success, Failure }

object FuturesSequence extends App {
  val f1 = future {
    1
  }

  val f2 = future {
    2
  }

  val lf = List(f1, f2)

  val seq = Future.sequence(lf)

  seq.onSuccess {
    case l => println(l)
  }

  Await.ready(seq, 5 seconds)
}

By using Await.result instead, you can skip the onSuccess method too, as it will return the resulting list to you.

Example:

val seq: List[Int] = Await.result(Future.sequence(lf), 5 seconds)
println(seq)