Flaky onSuccess of Future.sequence
Asked Answered
S

2

4

I wrote this method:

import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.util.{ Success, Failure }

object FuturesSequence extends App {
  val f1 = future {
    1
  }

  val f2 = future {
    2
  }

  val lf = List(f1, f2)

  val seq = Future.sequence(lf)

  seq.onSuccess {
    case l => println(l)
  }
}

I was expecting Future.sequence to gather a List[Future] into a Future[List] and then wait for every futures (f1 and f2 in my case) to complete before calling onSuccess on the Future[List] seq in my case.

But after many runs of this code, it prints "List(1, 2)" only once in a while and I can't figure out why it does not work as expected.

Segno answered 21/1, 2015 at 22:29 Comment(4)
The application is exiting before the callback can be executed.Mehalick
Cf. #16613017Eponymy
As others have said. The default execution context runs your futures on daemon threads which doesn't stop the JVM from exiting. If this is for a quick test the easiest thing would be to use Await.result on the seqTedmann
So is changing the execution context the only way to do this ? Or is their another approach with the right behavior and the current execution context (maybe a blocking step at the end of the asynchronous phase, when I'm waiting for all the results to be return isn't that bad ?)Segno
P
6

Try this for once,

import scala.concurrent._
import java.util.concurrent.Executors
import scala.util.{ Success, Failure }

object FuturesSequence extends App {
  implicit val exec = ExecutionContext.fromExecutor(Executors.newCachedThreadPool)
  val f1 = future {
    1
  }

  val f2 = future {
    2
  }

  val lf = List(f1, f2)

  val seq = Future.sequence(lf)

  seq.onSuccess {
    case l => println(l)
  }
}

This will always print List(1,2). The reason is simple, the exec above is an ExecutionContext of threads (not daemon threads) where as in your example the ExecutionContext was the default one implicitly taken from ExecutionContext.Implicits.global which contains daemon threads.

Hence being daemon, the process doesn't wait for seq future to be completed and terminates. if at all seq does get completed then it prints. But that doesn't happen always

Pileate answered 22/1, 2015 at 10:44 Comment(2)
when writing futures like this do the futures ever timeout? Asking because I'd prefer a future over thread in case I want to have a return typeCarnotite
futures by themselves dont get timed out. Unless you explicitly try canceling it. Even in that case, its just the interrupt status of the thread that may be set, and nother more.Pileate
S
2

The application is exiting before the future is completes.

You need to block until the future has completed. This can be achieved in a variety of ways, including changing the ExecutionContext, instantiating a new ThreadPool, Thread.sleep etc, or by using methods on scala.concurrent.Await

The simplest way for your code is by using Await.ready. This blocks on a future for a specified amount of time. In the modified code below, the application waits for 5 seconds before exiting.

Note also, the extra import scala.concurrent.duration so we can specify the time to wait.

import scala.concurrent._
import scala.concurrent.duration._
import java.util.concurrent.Executors
import scala.util.{ Success, Failure }

object FuturesSequence extends App {
  val f1 = future {
    1
  }

  val f2 = future {
    2
  }

  val lf = List(f1, f2)

  val seq = Future.sequence(lf)

  seq.onSuccess {
    case l => println(l)
  }

  Await.ready(seq, 5 seconds)
}

By using Await.result instead, you can skip the onSuccess method too, as it will return the resulting list to you.

Example:

val seq: List[Int] = Await.result(Future.sequence(lf), 5 seconds)
println(seq)
Sherrill answered 23/1, 2015 at 9:18 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.