Run multiple futures in parallel, return default value on timeout
Asked Answered
F

7

16

I have to run multiple futures in parallel and the program shouldn't crash or hang.

For now I wait on futures one by one, and use fallback value if there is TimeoutException.

val future1 = // start future1
val future2 = // start future2
val future3 = // start future3

// <- at this point all 3 futures are running

// waits for maximum of timeout1 seconds
val res1 = toFallback(future1, timeout1, Map[String, Int]())
// .. timeout2 seconds 
val res2 = toFallback(future2, timeout2, List[Int]())
// ... timeout3 seconds
val res3 = toFallback(future3, timeout3, Map[String, BigInt]()) 

def toFallback[T](f: Future[T], to: Int, default: T) = {
  Try(Await.result(f, to seconds))
    .recover { case to: TimeoutException => default }
}

As I can see, maximum wait time of this snippet is timeout1 + timeout2 + timeout3

My question is: how can I wait on all of those futures at once, so I can reduce wait time to max(timeout1, timeout2, timeout3)?

EDIT: In the end I used modification of @Jatin and @senia answers:

private def composeWaitingFuture[T](fut: Future[T], 
                                    timeout: Int, default: T) =
  future { Await.result(fut, timeout seconds) } recover {
    case e: Exception => default
  }

and later it's used as follows:

// starts futures immediately and waits for maximum of timeoutX seconds
val res1 = composeWaitingFuture(future1, timeout1, Map[String, Int]())
val res2 = composeWaitingFuture(future2, timeout2, List[Int]())
val res3 = composeWaitingFuture(future3, timeout3, Map[String, BigInt]()) 

// takes the maximum of max(timeout1, timeout2, timeout3) to complete
val combinedFuture =
  for {
    r1 <- res1
    r2 <- res2
    r3 <- res3
  } yield (r1, r2, r3)

and later I use combinedFuture as I see fit.

Flashgun answered 4/7, 2013 at 9:37 Comment(3)
What I do not understand is, how is it timeout1 + timeout2 + timeout3? It is rather timeout1 for future1` , timeout2 for future2 and so on. The questions is still unclear to meUneventful
He wants to run the 3 tasks in parallel, such that the timeout is the maximum of the three tasks' timeoutBrigidbrigida
I think this answer I gave a while back is similar to what you want and it also leverages the non-blocking callbacks. #16304971Sayette
U
10
def toFallback[T](f: Future[T], to: Int, default: T) = {
  future{
  try{
        Await.result(f, to seconds)
   }catch{
        case e:TimeoutException => default
  }
 }

You can even make this block asynchronous and each request waits for its maximum time. If there are too many threads, probably have a single thread that keeps checking for other futures using Akka's system scheduler. @Senia has answered below on this one.

Uneventful answered 4/7, 2013 at 10:17 Comment(1)
Await.result blocks thread, so you should not use default ExecutionContext here. You could create a special ExecutionContext for calls of toFallback or even start a new thread instead of future method like in this answer.Initiate
I
14

You could create future that returns results of all 3 futures using flatMap or for-comprehension:

val combinedFuture =
  for {
    r1 <- future1
    r2 <- future2
    r3 <- future3
  } yield (r1, r2, r3)

val (r1, r2, r3) = Await.result(combinedFuture , Seq(timeout1, timeout2, timeout3).max)

If you are using akka you could complete your future with default value after timeout:

implicit class FutureHelper[T](f: Future[T]) extends AnyVal{
  import akka.pattern.after
  def orDefault(t: Timeout, default: => T)(implicit system: ActorSystem): Future[T] = {
    val delayed = after(t.duration, system.scheduler)(Future.successful(default))
    Future firstCompletedOf Seq(f, delayed)
  }
}

val combinedFuture =
  for {
    r1 <- future1.orDefault(timeout1, Map())
    r2 <- future2.orDefault(timeout2, List())
    r3 <- future3.orDefault(timeout3, Map())
  } yield (r1, r2, r3)

val (r1, r2, r3) = Await.result(combinedFuture , allowance + Seq(timeout1, timeout2, timeout3).max)
Initiate answered 4/7, 2013 at 10:19 Comment(2)
This has one flaw. Say if future1 took long time but the other futures got completed, you will not get output for anything. future2 and future3 output would be useless.Uneventful
@Jatin: You could complete your future with default value aftre timeout in akka. See update.Initiate
U
10
def toFallback[T](f: Future[T], to: Int, default: T) = {
  future{
  try{
        Await.result(f, to seconds)
   }catch{
        case e:TimeoutException => default
  }
 }

You can even make this block asynchronous and each request waits for its maximum time. If there are too many threads, probably have a single thread that keeps checking for other futures using Akka's system scheduler. @Senia has answered below on this one.

Uneventful answered 4/7, 2013 at 10:17 Comment(1)
Await.result blocks thread, so you should not use default ExecutionContext here. You could create a special ExecutionContext for calls of toFallback or even start a new thread instead of future method like in this answer.Initiate
A
3

I would avoid using Await.result since that uses a thread just for blocking. One option to implement timeout for futures would be this:

val timer = new Timer()

def toFallback[T](f: Future[T], timeout: Int, default: T) = {
  val p = Promise[T]()
  f.onComplete(result => p.tryComplete(result))
  timer.schedule(new TimerTask {
    def run() {
      p.tryComplete(Success(default))
    }
  }, timeout)
  p.future
}

This creates a promise which will be completed either by a future or by a the default result after the specified timeout - whichever comes first.

To run the queries concurrently you would do like so:

val future1 = // start future1
val future2 = // start future2
val future3 = // start future3

val res1 = toFallback(future1, timeout1, Map[String, Int]())
val res2 = toFallback(future2, timeout2, List[Int]())
val res3 = toFallback(future3, timeout3, Map[String, BigInt]())

val resultF = for {
  r1 <- res1
  r2 <- res2
  r3 <- res3
} yield (r1, r2, r3)

val (r1, r2, r3) = Await.result(resultF, Duration.Inf)
println(s"$r1, $r2, $r3")

//or
resultF.onSuccess {
  case (r1, r2, r3) => println(s"$r1, $r2, $r3")
}
Absentee answered 4/7, 2013 at 10:19 Comment(1)
what is Timer ?Lyman
M
2

Here's a longer (unakka) answer that addresses what might be the use case, namely, if one of the values "times out" you want to use the default value for that result and also do something with it (such as cancel the long-running calculation or i/o or whatever).

Needless to say, the other story is to minimize blocking.

The basic idea is to sit in a loop awaiting the firstCompletedOf the items which haven't yet completed. The timeout on the ready is the minimum remaining timeout.

This code uses deadlines instead of durations, but using a duration as "time remaining" is easy.

import scala.language.postfixOps
import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits._
import scala.reflect._
import scala.util._
import java.lang.System.{ nanoTime => now }

import Test.time

class Test {

  type WorkUnit[A] = (Promise[A], Future[A], Deadline, A)
  type WorkQ[A] = Seq[WorkUnit[A]]

  def await[A: ClassTag](work: Seq[(Future[A], Deadline, A)]): Seq[A] = {
    // check for timeout; if using Duration instead of Deadline, decrement here
    def ticktock(w: WorkUnit[A]): WorkUnit[A] = w match {
      case (p, f, t, v) if !p.isCompleted && t.isOverdue => p trySuccess v ; w
      case _ => w
    }
    def await0(work: WorkQ[A]): WorkQ[A] = {
      val live = work filterNot (_._1.isCompleted)
      val t0 = (live map (_._3)).min
      Console println s"Next deadline in ${t0.timeLeft.toMillis}"
      val f0 = Future firstCompletedOf (live map (_._2))
      Try(Await ready (f0, t0.timeLeft))
      val next = work map (w => ticktock(w))
      if (next exists (!_._1.isCompleted)) {
        await0(next)
      } else {
        next
      }
    }
    val wq = work map (_ match {
      case (f, t, v) =>
        val p = Promise[A]
        p.future onComplete (x => Console println s"Value available: $x: $time")
        f onSuccess {
          case a: A => p trySuccess a  // doesn't match on primitive A
          case x => p trySuccess x.asInstanceOf[A]
        }
        f onFailure { case _ => p trySuccess v }
        (p, f, t, v)
    })
    await0(wq) map (_ match {
      case (p, f, t, v) => p.future.value.get.get
    })
  }
}

object Test {
  val start = now
  def time = s"The time is ${ Duration fromNanos (now - start) toMillis }"

  def main(args: Array[String]): Unit = {
    // #2 times out
    def calc(i: Int) = {
      val t = if (args.nonEmpty && i == 2) 6 else i
      Thread sleep t * 1000L
      Console println s"Calculate $i: $time"
      i
    }
    // futures to be completed by a timeout deadline
    // or else use default and let other work happen
    val work = List(
      (future(calc(1)), 3 seconds fromNow, 10),
      (future(calc(2)), 5 seconds fromNow, 20),
      (future(calc(3)), 7 seconds fromNow, 30)
    )
    Console println new Test().await(work)
  }
}

Sample run:

apm@mara:~/tmp$ skalac nextcompleted.scala ; skala nextcompleted.Test 
Next deadline in 2992
Calculate 1: The time is 1009
Value available: Success(1): The time is 1012
Next deadline in 4005
Calculate 2: The time is 2019
Value available: Success(2): The time is 2020
Next deadline in 4999
Calculate 3: The time is 3020
Value available: Success(3): The time is 3020
List(1, 2, 3)
apm@mara:~/tmp$ skala nextcompleted.Test arg
Next deadline in 2992
Calculate 1: The time is 1009
Value available: Success(1): The time is 1012
Next deadline in 4005
Calculate 3: The time is 3020
Value available: Success(3): The time is 3020
Next deadline in 1998
Value available: Success(20): The time is 5020
List(1, 20, 3)
Marcomarconi answered 5/7, 2013 at 18:12 Comment(0)
B
0

Why not get the Future itself to perform the exception capture and return of the default ? Then you can simply Await on each future in turn, and you don't have to worry about the exception handling outside the future.

Brigidbrigida answered 4/7, 2013 at 10:42 Comment(0)
A
0

This is perhaps a bit hacky, but you can simply measure elapsed time and modify timeouts accordingly. Assuming timeout1 <= timeout2 <= timeout3:

def now     = System.currentTimeMillis();
val start   = now;
def remains(timeout: Long): Long
            = math.max(0, timeout + start - now)

def toFallback[T](f: Future[T], to: Int, default: T) = {
  Try(Await.result(f, remains(to) seconds))
    .recover { case to: TimeoutException => default }
}

This way each timeout is based to the moment start = now was called, so the overall running time is at most timeout3. If the timeouts aren't oredered, it still works, but some tasks can be left running longer than their designated timeout.

Ancheta answered 4/7, 2013 at 11:47 Comment(0)
E
0

Use Monix Task, it is Future on steroid.

import monix.execution.Scheduler.Implicits.global
import monix.eval._
import scala.concurrent.duration._

val task1 = Task{Thread.sleep(1);"task1"}.timeoutTo(timeout1,Task.now("timeout1"))
val task2 = Task{Thread.sleep(2);"task2"}.timeoutTo(timeout2,Task.now("timeout2"))
Task.zipList(Seq(task1,task2)).runSyncUnsafe(Duration.Inf)
Elam answered 16/5, 2018 at 8:20 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.