All futures may be successful eventually (and some may fail), but we want the first successful one. And want to represent that result as a future. This future will fail if all the futures in the list fail.
In scala, how do you transform a list of futures into a future that returns the first successful future? [duplicate]
Duplicate of https://mcmap.net/q/303336/-futures-success-race/1296806 and presumably others. –
Wittol
As indicated the documentation, Future.firstCompletedOf
is provided.
import scala.concurrent.{ExecutionnContext, Future }
def foo[T](f: => Seq[Future[T]])(implicit ec: ExecutionContext): Future[T] =
Future.firstCompletedOf(f)
The other answer says first completed may be failure. –
Wittol
Yeah, so that won't work. There should be a firstSuccess. –
Lobe
RayRoestenburg returns the first successful one like so
def firstSucceededOf[T](futures: List[Future[T]]): Future[T] = {
val p = Promise[T]()
val size = futures.size
val failureCount = new AtomicInteger(0)
futures foreach {
_.onComplete {
case Success(v) => p.trySuccess(v)
case Failure(e) =>
val count = failureCount.incrementAndGet
if (count == size) p.tryFailure(e)
}
}
p.future
}
The key is to understand Promise.trySuccess
completes the Promise
only once. Here is a working example
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util._
import java.util.concurrent.atomic.AtomicInteger
import Thread.sleep
object FirstSucceededOfExample extends App {
def firstSucceededOf[T](futures: List[Future[T]]): Future[T] = {
val p = Promise[T]()
val size = futures.size
val failureCount = new AtomicInteger(0)
futures foreach {
_.onComplete {
case Success(v) => p.trySuccess(v)
case Failure(e) =>
val count = failureCount.incrementAndGet
if (count == size) p.tryFailure(e)
}
}
p.future
}
val futures = List(
Future {sleep(2000); -11},
Future {sleep(3000); -7},
Future {42}
)
firstSucceededOf(futures)
.andThen(v => println(v))
sleep(1000)
}
which outputs
Success(42)
If all futures complete with a failure
val futures = List(
Future(throw new RuntimeException("boom 2")),
Future(throw new RuntimeException("boom 3")),
Future(throw new RuntimeException("boom 1"))
)
it returns the last completed failure.
Note Future.firstCompletedOf
is not sufficient as it returns first completed (as a success or failure) not the first successfully completed:
object FirstSucceededOfExample extends App {
def foo[T](f: => Seq[Future[T]]): Future[T] =
Future.firstCompletedOf(f)
val futures = List(
Future {sleep(2000); -11},
Future {sleep(3000); -7},
Future.failed(new RuntimeException("boom"))
)
foo(futures)
.andThen(v => println(v))
Thread.sleep(1000)
}
which outputs
Failure(java.lang.RuntimeException: boom)
You can
Future { sleep(n) ; -11 }
w/o parens. –
Wittol © 2022 - 2024 — McMap. All rights reserved.