Scala future sequence and timeout handling
Asked Answered
B

3

9

There are some good hints how to combine futures with timeouts. However I'm curious how to do this with Future sequence sequenceOfFutures

My first approach looks like this

import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits._

object FutureSequenceScala extends App {
  println("Creating futureList")

  val timeout = 2 seconds
  val futures = List(1000, 1500, 1200, 800, 2000) map { ms =>
    val f = future {
      Thread sleep ms
      ms toString
    }
    Future firstCompletedOf Seq(f, fallback(timeout))
  }

  println("Creating waitinglist")
  val waitingList = Future sequence futures
  println("Created")

  val results = Await result (waitingList, timeout * futures.size)
  println(results)

  def fallback(timeout: Duration) = future {
    Thread sleep (timeout toMillis)
    "-1"
  }
}

Is there a better way to handle timeouts in a sequence of futures or is this a valid solution?

Belch answered 16/7, 2013 at 9:27 Comment(0)
T
8

There are a few things in your code here that you might want to reconsider. For starters, I'm not a huge fan of submitting tasks into the ExecutionContext that have the sole purpose of simulating a timeout and also have Thread.sleep used in them. The sleep call is blocking and you probably want to avoid having a task in the execution context that is purely blocking for the sake of waiting a fixed amount of time. I'm going to steal from my answer here and suggest that for pure timeout handling, you should use something like I outlined in that answer. The HashedWheelTimer is a highly efficient timer implementation that is mush better suited to timeout handling than a task that just sleeps.

Now, if you go that route, the next change I would suggest concerns handling the individual timeout related failures for each future. If you want an individual failure to completely fail the aggregate Future returned from the sequence call, then do nothing extra. If you don't want that to happen, and instead want a timeout to return some default value instead, then you can use recover on the Future like this:

withTimeout(someFuture).recover{
  case ex:TimeoutException => someDefaultValue
}

Once you've done that, you can take advantage of the non-blocking callbacks and do something like this:

waitingList onComplete{
  case Success(results) => //handle success
  case Failure(ex) => //handle fail
}

Each future has a timeout and thus will not just run infinitely. There is no need IMO to block there and provide an additional layer of timeout handling via the atMost param to Await.result. But I guess this assumes you are okay with the non-blocking approach. If you really need to block there, then you should not be waiting timeout * futures.size amount of time. These futures are running in parallel; the timeout there should only need to be as long as the individual timeouts for the futures themselves (or just slightly longer to account for any delays in cpu/timing). It certainly should not be the timeout * the total number of futures.

Tyratyrannical answered 16/7, 2013 at 12:29 Comment(6)
As curiosity, how is HashedWheelTimer more efficient than TimerTask or newScheduledThreadPoolExecutor? Both do the same job.Rid
@Jatin, I suppose you can check out this link for more info: #15348100. But at it's heart, adding more tasks should not consume more resources. It's supposed to be a more constant time (in terms of system resources consumed) based timer then a something like a Timer and TimerTask. For a high throughput system where you will be scheduling lots and lots of short lived timeout based tasks, it's a better solution because of the constant resource usage claims.Tyratyrannical
But how does STPE with coresize 1 consume more resources when compared to HashedWheelTimer? I am sorry but I not get it. STPE has more insertion time due to internal heap O(log(n)) but lesser tick time. Can you please explainRid
@Jatin, when you are potentially inserting tasks at a very high rate, insertion time matters very much. Hashed wheel timer may not be as completely accurate based on what you select for a tick interval, but if that is not as important to you as say getting tons of tasks in and out of the timer, then it's regarded as the better choice. If it means anything, Akka uses this same exact approach (including a HWT) for their Actor ask (?) timeout handling. That's what I based my code on for the withTimeout function.Tyratyrannical
You can also check out this post for more info: #17276893Tyratyrannical
Thanks. I am aware of akka and the link but it still hasn't sunk in well. I think insertion time is the reason people prefer HWT forRid
B
1

Here's a version that shows how bad your blocking fallback is.

Notice that the executor is single threaded and you're creating many fallbacks.

@cmbaxter is right, your master timeout shouldn't be timeout * futures.size, it should be bigger!

@cmbaxter is also right that you want to think non-blocking. Once you do that, and you want to impose timeouts, then you will pick a timer component for that, see his linked answer (also linked from your linked answer).

That said, I still like my answer from your link, in so far as sitting in a loop waiting for the next thing that should timeout is really simple.

It just takes a list of futures and their timeouts and a fallback value.

Maybe there is a use case for that, such as a simple app that just blocks for some results (like your test) and must not exit before results are in.

import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext

import java.util.concurrent.Executors
import java.lang.System.{ nanoTime => now }

object Test extends App { 
  //implicit val xc = ExecutionContext.global
  implicit val xc = ExecutionContext fromExecutorService (Executors.newSingleThreadExecutor)

  def timed[A](body: =>A): A = {
    val start = now 
    val res = body
    val end = now
    Console println (Duration fromNanos end-start).toMillis + " " + res
    res
  }
  println("Creating futureList")

  val timeout = 1500 millis
  val futures = List(1000, 1500, 1200, 800, 2000) map { ms =>
    val f = future {
      timed {
        blocking(Thread sleep ms)
        ms toString
      }
    } 
    Future firstCompletedOf Seq(f, fallback(timeout))
  }   

  println("Creating waitinglist")
  val waitingList = Future sequence futures
  println("Created")

  timed {
  val results = Await result (waitingList, 2 * timeout * futures.size)
  println(results)
  }     
  xc.shutdown

  def fallback(timeout: Duration) = future {
    timed {
      blocking(Thread sleep (timeout toMillis))
      "-1"
    }
  }   
}   

What happened:

Creating futureList
Creating waitinglist
Created
1001 1000
1500 -1
1500 1500
1500 -1
1200 1200
1500 -1
800 800
1500 -1
2000 2000
1500 -1
List(1000, 1500, 1200, 800, 2000)
14007 ()
Broomfield answered 16/7, 2013 at 16:7 Comment(0)
S
0

Monix Task has timeout support:

  import monix.execution.Scheduler.Implicits.global
  import monix.eval._
  import scala.concurrent.duration._

  println("Creating futureList")
  val tasks = List(1000, 1500, 1200, 800, 2000).map{ ms =>
    Task {
      Thread.sleep(ms)
      ms.toString
    }.timeoutTo(2.seconds, Task.now("-1"))
  }

  println("Creating waitinglist")
  val waitingList = Task.gather(tasks) // Task.sequence is true/literally "sequencing" operation

  println("Created")
  val results = Await.result(waitingList, timeout * futures.size)
  println(results)
Steep answered 16/5, 2018 at 9:9 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.