Scala: List[Future] to Future[List] disregarding failed futures
Asked Answered
V

6

126

I'm looking for a way to convert an arbitrary length list of Futures to a Future of List. I'm using Playframework, so ultimately, what I really want is a Future[Result], but to make things simpler, let's just say Future[List[Int]] The normal way to do this would be to use Future.sequence(...) but there's a twist... The list I'm given usually has around 10-20 futures in it, and it's not uncommon for one of those futures to fail (they are making external web service requests).

Instead of having to retry all of them in the event that one of them fails, I'd like to be able to get at the ones that succeeded and return those.

For example, doing the following doesn't work:

import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Success
import scala.util.Failure

val listOfFutures = Future.successful(1) :: Future.failed(new Exception("Failure")) :: 
                    Future.successful(3) :: Nil

val futureOfList = Future.sequence(listOfFutures)

futureOfList onComplete {
  case Success(x) => println("Success!!! " + x)
  case Failure(ex) => println("Failed !!! " + ex)
}

scala> Failed !!! java.lang.Exception: Failure

Instead of getting the only the exception, I'd like to be able to pull the 1 and 3 out of there. I tried using Future.fold, but that apparently just calls Future.sequence behind the scenes.

Vida answered 1/1, 2014 at 23:1 Comment(1)
If you are using Twitter's Future, it has a method called collectToTry with that you just need to filter the successful ones twitter.github.io/util/docs/com/twitter/util/…Madonnamadora
S
159

The trick is to first make sure that none of the futures has failed. .recover is your friend here, you can combine it with map to convert all the Future[T] results to Future[Try[T]]] instances, all of which are certain to be successful futures.

note: You can use Option or Either as well here, but Try is the cleanest way if you specifically want to trap exceptions

def futureToFutureTry[T](f: Future[T]): Future[Try[T]] =
  f.map(Success(_)).recover { case x => Failure(x)}

val listOfFutures = ...
val listOfFutureTrys = listOfFutures.map(futureToFutureTry(_))

Then use Future.sequence as before, to give you a Future[List[Try[T]]]

val futureListOfTrys = Future.sequence(listOfFutureTrys)

Then filter:

val futureListOfSuccesses = futureListOfTrys.map(_.filter(_.isSuccess))

You can even pull out the specific failures, if you need them:

val futureListOfFailures = futureListOfTrys.map(_.filter(_.isFailure))
Stone answered 1/1, 2014 at 23:35 Comment(10)
Thanks! .recover was indeed the missing piece for me.Vida
You could use _.collect{ case Success(x) => x} instead of _.filter(_.isSuccess) to get rid of Try in type of futureListOfSuccesses.Beeswax
In scala 2010 .recover(x => Failure(x)) is not valid, use .recover({case e => Failure(e)}) insteadKurr
I think you're missing the future wrapper: def futureToFutureOfTry[A](f: Future[A]): Future[Try[A]] = { val p = Promise[Try[A]]() f.map{ a=> p.success(scala.util.Success(a)) }.recover{ case x: Throwable => p.success(Failure(x)) } p.future }Figural
not so. I'm mapping a Future to another future, an intervening Promise isn't needed and would be wastefulStone
Great answer. I found that my version of scala gave me an error on the line > f.map(Success(_)).recover(x => Failure(x)). I corrected the error, and wrote a few tests so i could better understand the solution. I have included the code in an additional answer, below.Footgear
Note that if you're using Finagle futures (com.twitter.util.Future), then it's Future.collect(listOfFutures) which returns a future of a sequence.Millett
Thanks Kevin. I need this exact thing. Not only are you good at slides you are ok at scala. Ha haCottle
This is gold. I wonder if there is an easy way to use Seq.partition to separate successes from failures.Baillieu
There is no need to wrap the Futures into anything. Just use .transformWith to handle Success and Failure directly. You still may want to wrap the results into an Option and then collect the Somes but it is still a whole lot more readable then this listOfFutureTrys.Abracadabra
S
19

Scala 2.12 has an improvement on Future.transform that lends itself in an anwser with less codes.

val futures = Seq(Future{1},Future{throw new Exception})

// instead of `map` and `recover`, use `transform`
val seq = Future.sequence(futures.map(_.transform(Success(_)))) 

val successes = seq.map(_.collect{case Success(x)=>x})
successes
//res1: Future[Seq[Int]] = Future(Success(List(1)))

val failures = seq.map(_.collect{case Failure(x)=>x})
failures
//res2: Future[Seq[Throwable]] = Future(Success(List(java.lang.Exception)))
Sketch answered 7/12, 2017 at 3:37 Comment(0)
F
11

I tried Kevin's answer, and I ran into a glitch on my version of Scala (2.11.5)... I corrected that, and wrote a few additional tests if anyone is interested... here is my version >

implicit class FutureCompanionOps(val f: Future.type) extends AnyVal {

    /** Given a list of futures `fs`, returns the future holding the list of Try's of the futures from `fs`.
      * The returned future is completed only once all of the futures in `fs` have been completed.
      */
    def allAsTrys[T](fItems: /* future items */ List[Future[T]]): Future[List[Try[T]]] = {
      val listOfFutureTrys: List[Future[Try[T]]] = fItems.map(futureToFutureTry)
      Future.sequence(listOfFutureTrys)
    }

    def futureToFutureTry[T](f: Future[T]): Future[Try[T]] = {
      f.map(Success(_)) .recover({case x => Failure(x)})
    }

    def allFailedAsTrys[T](fItems: /* future items */ List[Future[T]]): Future[List[Try[T]]] = {
      allAsTrys(fItems).map(_.filter(_.isFailure))
    }

    def allSucceededAsTrys[T](fItems: /* future items */ List[Future[T]]): Future[List[Try[T]]] = {
      allAsTrys(fItems).map(_.filter(_.isSuccess))
    }
}


// Tests... 



  // allAsTrys tests
  //
  test("futureToFutureTry returns Success if no exception") {
    val future =  Future.futureToFutureTry(Future{"mouse"})
    Thread.sleep(0, 100)
    val futureValue = future.value
    assert(futureValue == Some(Success(Success("mouse"))))
  }
  test("futureToFutureTry returns Failure if exception thrown") {
    val future =  Future.futureToFutureTry(Future{throw new IllegalStateException("bad news")})
    Thread.sleep(5)            // need to sleep a LOT longer to get Exception from failure case... interesting.....
    val futureValue = future.value

    assertResult(true) {
      futureValue match {
        case Some(Success(Failure(error: IllegalStateException)))  => true
      }
    }
  }
  test("Future.allAsTrys returns Nil given Nil list as input") {
    val future =  Future.allAsTrys(Nil)
    assert ( Await.result(future, 100 nanosecond).isEmpty )
  }
  test("Future.allAsTrys returns successful item even if preceded by failing item") {
    val future1 =  Future{throw new IllegalStateException("bad news")}
    var future2 = Future{"dog"}

    val futureListOfTrys =  Future.allAsTrys(List(future1,future2))
    val listOfTrys =  Await.result(futureListOfTrys, 10 milli)
    System.out.println("successItem:" + listOfTrys);

    assert(listOfTrys(0).failed.get.getMessage.contains("bad news"))
    assert(listOfTrys(1) == Success("dog"))
  }
  test("Future.allAsTrys returns successful item even if followed by failing item") {
    var future1 = Future{"dog"}
    val future2 =  Future{throw new IllegalStateException("bad news")}

    val futureListOfTrys =  Future.allAsTrys(List(future1,future2))
    val listOfTrys =  Await.result(futureListOfTrys,  10 milli)
    System.out.println("successItem:" + listOfTrys);

    assert(listOfTrys(1).failed.get.getMessage.contains("bad news"))
    assert(listOfTrys(0) == Success("dog"))
  }
  test("Future.allFailedAsTrys returns the failed item and only that item") {
    var future1 = Future{"dog"}
    val future2 =  Future{throw new IllegalStateException("bad news")}

    val futureListOfTrys =  Future.allFailedAsTrys(List(future1,future2))
    val listOfTrys =  Await.result(futureListOfTrys,  10 milli)
    assert(listOfTrys(0).failed.get.getMessage.contains("bad news"))
    assert(listOfTrys.size == 1)
  }
  test("Future.allSucceededAsTrys returns the succeeded item and only that item") {
    var future1 = Future{"dog"}
    val future2 =  Future{throw new IllegalStateException("bad news")}

    val futureListOfTrys =  Future.allSucceededAsTrys(List(future1,future2))
    val listOfTrys =  Await.result(futureListOfTrys,  10 milli)
    assert(listOfTrys(0) == Success("dog"))
    assert(listOfTrys.size == 1)
  }
Footgear answered 16/8, 2015 at 1:34 Comment(0)
K
7

I just came across this question and have another solution to offer:

def allSuccessful[A, M[X] <: TraversableOnce[X]](in: M[Future[A]])
                                                (implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], 
                                                 executor: ExecutionContext): Future[M[A]] = {
    in.foldLeft(Future.successful(cbf(in))) {
      (fr, fa) ⇒ (for (r ← fr; a ← fa) yield r += a) fallbackTo fr
    } map (_.result())
}

The idea here is that within the fold you are waiting for the next element in the list to complete (using the for-comprehension syntax) and if the next one fails you just fallback to what you already have.

Khufu answered 10/8, 2016 at 21:35 Comment(1)
I dislike the name but I like the way it's done, straight from the sequence implPros
R
1

You can easily wraps future result with option and then flatten the list:

def futureToFutureOption[T](f: Future[T]): Future[Option[T]] =
    f.map(Some(_)).recover {
      case e => None
    }
val listOfFutureOptions = listOfFutures.map(futureToFutureOption(_))

val futureListOfOptions = Future.sequence(listOfFutureOptions)

val futureListOfSuccesses = futureListOfOptions.flatten
Rivers answered 5/5, 2018 at 5:6 Comment(1)
Just in case someone else encounters an error with Some in the first function, the first function can be rewritten like so to prevent compiler error: def futureToFutureOption[T](f: Future[T]): Future[Option[T]] = f.map(Option(_)).recover { case e => None }Cuspidation
P
0

You can also collect successful and unsuccessful results in different lists:

def safeSequence[A](futures: List[Future[A]]): Future[(List[Throwable], List[A])] = {
  futures.foldLeft(Future.successful((List.empty[Throwable], List.empty[A]))) { (flist, future) =>
    flist.flatMap { case (elist, alist) =>
      future
        .map { success => (elist, alist :+ success) }
        .recover { case error: Throwable => (elist :+ error, alist) }
    }
  }
}
Pothook answered 11/3, 2020 at 12:57 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.