Scala waiting for sequence of futures
Asked Answered
V

6

38

I was hoping code like follows would wait for both futures, but it does not.

object Fiddle {
  val f1 = Future {
    throw new Throwable("baaa") // emulating a future that bumped into an exception
  }

  val f2 = Future {
    Thread.sleep(3000L) // emulating a future that takes a bit longer to complete
    2
  }

  val lf = List(f1, f2) // in the general case, this would be a dynamically sized list

  val seq = Future.sequence(lf) 

  seq.onComplete {
    _ => lf.foreach(f => println(f.isCompleted))
  }
}

val a = FuturesSequence

I assumed seq.onComplete would wait for them all to complete before completing itself, but not so; it results in:

true
false

.sequence was a bit hard to follow in the source of scala.concurrent.Future, I wonder how I would implement a parallel that waits for all original futures of a (dynamically sized) sequence, or what might be the problem here.

Edit: A related question: https://worldbuilding.stackexchange.com/questions/12348/how-do-you-prove-youre-from-the-future :)

Vaporimeter answered 30/3, 2015 at 10:59 Comment(0)
F
54

One common approach to waiting for all results (failed or not) is to "lift" failures into a new representation inside the future, so that all futures complete with some result (although they may complete with a result that represents failure). One natural way to get that is lifting to a Try.

Twitter's implementation of futures provides a liftToTry method that makes this trivial, but you can do something similar with the standard library's implementation:

import scala.util.{ Failure, Success, Try }

val lifted: List[Future[Try[Int]]] = List(f1, f2).map(
  _.map(Success(_)).recover { case t => Failure(t) }
)

Now Future.sequence(lifted) will be completed when every future is completed, and will represent successes and failures using Try.

And so, a generic solution for waiting on all original futures of a sequence of futures may look as follows, assuming an execution context is of course implicitly available.

  import scala.util.{ Failure, Success, Try }

  private def lift[T](futures: Seq[Future[T]]) = 
    futures.map(_.map { Success(_) }.recover { case t => Failure(t) })

  def waitAll[T](futures: Seq[Future[T]]) =
    Future.sequence(lift(futures)) // having neutralized exception completions through the lifting, .sequence can now be used

  waitAll(SeqOfFutures).map { 
    // do whatever with the completed futures
  }
Fungosity answered 30/3, 2015 at 11:23 Comment(3)
Many thanks, I will probably use and adapt this function! Would you find it accurate saying, that using Try essentially maps from an execution result outcome of returned value | flow control exception to a Success | Failure return value?Vaporimeter
By the way, I could quite very equally implement the same transformation, using promises rather than Try. I already fall down to promises where I need to extend the Future trait (pimping it with implicit classes).Vaporimeter
You almost certainly don't want promises here. Try is essentially a synchronous Future, where instead of three states (unsatisfied, success, failure) you only have two, which fits this problem exactly.Fungosity
R
27

A Future produced by Future.sequence completes when either:

  • all the futures have completed successfully, or
  • one of the futures has failed

The second point is what's happening in your case, and it makes sense to complete as soon as one of the wrapped Future has failed, because the wrapping Future can only hold a single Throwable in the failure case. There's no point in waiting for the other futures because the result will be the same failure.

Rotarian answered 30/3, 2015 at 11:13 Comment(6)
Thanks, right, I should have gathered that behaviour from the unary nature of .sequence's type. But the point is I would like to wait for all of them to complete; using a .sequence was just a (wrong) means. How shall I accomplish that most straightforwardly?Vaporimeter
This doesn't really answer the second part of the question. It's perfectly reasonable to want to gather all the results, failed or not, and it's a shame the standard library's implementation of futures doesn't have something like Twitter's liftToTry to facilitate this.Fungosity
@TravisBrown No need to specialcase that method in stdlib Futures: f.transform(Success(_))Immortality
@ViktorKlang …which unfortunately uses a transform that didn't exist before 2.12. :)Fungosity
@TravisBrown Yep. 😊 But now it exists, so use that. 👍😊Immortality
@ViktorKlang …unless you're one of the many thousands of Scala developers who will be stuck using or at least supporting 2.11 for years to come.Fungosity
M
3

This is an example that supports the previous answer. There is an easy way to do this using just the standard Scala APIs.

In the example, I am creating 3 futures. These will complete at 5, 7, and 9 seconds respectively. The call to Await.result will block until all futures have resolved. Once all 3 futures have completed, a will be set to List(5,7,9) and execution will continue.

Additionally, if an exception is thrown in any of the futures, Await.result will immediately unblock and throw the exception. Uncomment the Exception(...) line to see this in action.

  try {
    val a = Await.result(Future.sequence(Seq(
      Future({
        blocking {
          Thread.sleep(5000)
        }
        System.err.println("A")
        5
      }),
      Future({
        blocking {
          Thread.sleep(7000)
        }
        System.err.println("B")
        7
        //throw new Exception("Ha!")
      }),
      Future({
        blocking {
          Thread.sleep(9000)
        }
        System.err.println("C")
        9
      }))),
      Duration("100 sec"))

    System.err.println(a)
  } catch {
    case e: Exception ⇒
      e.printStackTrace()
  }
Microbalance answered 31/7, 2016 at 18:5 Comment(0)
E
2

Even though it is quite old question But this is how I got it running in recent time.

    object Fiddle {
      val f1 = Future {
        throw new Throwable("baaa") // emulating a future that bumped into an exception
      }
    
      val f2 = Future {
        Thread.sleep(3000L) // emulating a future that takes a bit longer to complete
        2
      }
    
      val lf = List(f1, f2) // in the general case, this would be a dynamically sized list
    
      val seq = Future.sequence(lf) 
      import scala.concurrent.duration._
      Await.result(seq, Duration.Inf) 
    }

This won't get completed and will wait till all the future gets completed. You can change the waiting time as per your use case. I have kept it to infinite and that was required in my case.

Effluent answered 16/4, 2020 at 13:23 Comment(0)
M
1

We can enrich Seq[Future[T]] with its own onComplete method through an implicit class:

  def lift[T](f: Future[T])(implicit ec: ExecutionContext): Future[Try[T]] =
    f map { Success(_) } recover { case e => Failure(e) }

  def lift[T](fs: Seq[Future[T]])(implicit ec: ExecutionContext): Seq[Future[Try[T]]] =
    fs map { lift(_) }

  implicit class RichSeqFuture[+T](val fs: Seq[Future[T]]) extends AnyVal {
    def onComplete[U](f: Seq[Try[T]] => U)(implicit ec: ExecutionContext) = {
      Future.sequence(lift(fs)) onComplete {
        case Success(s) => f(s)
        case Failure(e) => throw e // will never happen, because of the Try lifting
      }
    }
  }

Then, in your particular MWE, you can do:

  val f1 = Future {
    throw new Throwable("baaa") // emulating a future that bumped into an exception
  }

  val f2 = Future {
    Thread.sleep(3000L) // emulating a future that takes a bit longer to complete
    2
  }

  val lf = List(f1, f2)

  lf onComplete { _ map {
    case Success(v) => ???
    case Failure(e) => ???
  }}

This solution has the advantage of allowing you to call an onComplete on a sequence of futures as you would on a single future.

Micrometeorite answered 22/10, 2017 at 2:53 Comment(2)
Hopefully we won't need this trickery in the new collections libraryVaporimeter
@matanster, is there anything in particular in the development of the new collections library that makes you hope that the new collections library might allow us to achieve the same behaviour with less trickery?Micrometeorite
T
1

Create the Future with a Try to avoid extra hoops.

implicit val ec = ExecutionContext.global

val f1 = Future {
  Try {
    throw new Throwable("kaboom")
  }
}

val f2 = Future {
  Try {
    Thread.sleep(1000L)
    2
  }
}

Await.result(
  Future.sequence(Seq(f1, f2)), Duration("2 sec")
) foreach {
  case Success(res) => println(s"Success. $res")
  case Failure(e)   => println(s"Failure. ${e.getMessage}")
}
Truncate answered 5/11, 2019 at 21:13 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.