In scala, how do you transform a list of futures into a future that returns the first successful future? [duplicate]
Asked Answered
L

2

5

All futures may be successful eventually (and some may fail), but we want the first successful one. And want to represent that result as a future. This future will fail if all the futures in the list fail.

Lobe answered 23/1, 2020 at 20:52 Comment(1)
Duplicate of https://mcmap.net/q/303336/-futures-success-race/1296806 and presumably others.Wittol
W
5

As indicated the documentation, Future.firstCompletedOf is provided.

import scala.concurrent.{ExecutionnContext, Future }

def foo[T](f: => Seq[Future[T]])(implicit ec: ExecutionContext): Future[T] =
  Future.firstCompletedOf(f)
Wiretap answered 23/1, 2020 at 20:57 Comment(2)
The other answer says first completed may be failure.Wittol
Yeah, so that won't work. There should be a firstSuccess.Lobe
T
4

RayRoestenburg returns the first successful one like so

def firstSucceededOf[T](futures: List[Future[T]]): Future[T] = {
    val p = Promise[T]()
    val size = futures.size
    val failureCount = new AtomicInteger(0)

    futures foreach {
      _.onComplete {
        case Success(v) => p.trySuccess(v)
        case Failure(e) =>
          val count = failureCount.incrementAndGet
          if (count == size) p.tryFailure(e)
      }
    }
    p.future
  }

The key is to understand Promise.trySuccess completes the Promise only once. Here is a working example

import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util._
import java.util.concurrent.atomic.AtomicInteger
import Thread.sleep

object FirstSucceededOfExample extends App {
  def firstSucceededOf[T](futures: List[Future[T]]): Future[T] = {
    val p = Promise[T]()
    val size = futures.size
    val failureCount = new AtomicInteger(0)

    futures foreach {
      _.onComplete {
        case Success(v) => p.trySuccess(v)
        case Failure(e) =>
          val count = failureCount.incrementAndGet
          if (count == size) p.tryFailure(e)
      }
    }
    p.future
  }

  val futures = List(
    Future {sleep(2000); -11}, 
    Future {sleep(3000); -7}, 
    Future {42}
  )

  firstSucceededOf(futures)
    .andThen(v => println(v))

  sleep(1000)
}

which outputs

Success(42)

If all futures complete with a failure

  val futures = List(
    Future(throw new RuntimeException("boom 2")),
    Future(throw new RuntimeException("boom 3")),
    Future(throw new RuntimeException("boom 1"))
  )

it returns the last completed failure.


Note Future.firstCompletedOf is not sufficient as it returns first completed (as a success or failure) not the first successfully completed:

object FirstSucceededOfExample extends App {
  def foo[T](f: => Seq[Future[T]]): Future[T] =
    Future.firstCompletedOf(f)

  val futures = List(
    Future {sleep(2000); -11},
    Future {sleep(3000); -7},
    Future.failed(new RuntimeException("boom"))
  )

  foo(futures)
    .andThen(v => println(v))

  Thread.sleep(1000)
}

which outputs

Failure(java.lang.RuntimeException: boom)
Tamtama answered 23/1, 2020 at 21:23 Comment(1)
You can Future { sleep(n) ; -11 } w/o parens.Wittol

© 2022 - 2024 — McMap. All rights reserved.