Cancellation with Future and Promise in Scala
Asked Answered
C

4

12

This is a followup to my previous question.

Suppose I have a task, which executes an interruptible blocking call. I would like to run it as a Future and cancel it with failure method of Promise.

I would like the cancel to work as follows:

  • If one cancels the task before it finished I would like the task to finish "immediately", interrupting the blocking call if it has already started and I would like the Future to invoke onFailure.

  • If one cancels the task after the task finished I would like to get a status saying that the cancel failed since the task already finished.

Does it make sense? Is it possible to implement in Scala? Are there any examples of such implementations?

Crepe answered 15/4, 2013 at 17:15 Comment(0)
P
13

scala.concurrent.Future is read-only, so one reader cannot mess things up for the other readers.

It seems like you should be able to implement what you want as follows:

def cancellableFuture[T](fun: Future[T] => T)(implicit ex: ExecutionContext): (Future[T], () => Boolean) = {
  val p = Promise[T]()
  val f = p.future
  p tryCompleteWith Future(fun(f))
  (f, () => p.tryFailure(new CancellationException))
}

val (f, cancel) = cancellableFuture( future => {
  while(!future.isCompleted) continueCalculation // isCompleted acts as our interrupted-flag

  result  // when we're done, return some result
})

val wasCancelled = cancel() // cancels the Future (sets its result to be a CancellationException conditionally)
Pica answered 16/4, 2013 at 0:43 Comment(11)
Thanks. Suppose I execute some interruptible blocking call instead of calculation. How can I modify the code above to interrupt the thread?Crepe
You'd have to add a synchronized var that sets the current thread under a lock when the computation starts, and then takes the lock at the end and clears the var. And the cancel would take the lock and call interrupt on the set Thread, if any, or bail out if null.Pica
should it be while(!future.isCompleted && moreWork) continueCalculation?Ultrastructure
Cool - I figured but i was coding it up as an exercise and wanted to be sure :) The future that is passed into fun as a flag, is there anything special about it being a future other than it is read only and atomic? For example, could you use an AtomicBoolean instead (except it is read/write)?Ultrastructure
sourcedelica: An AtomicBoolean is mutable so that might not be the best choice, the reason for the Future is that it's already allocated and doesn't interfere in any way.Pica
Hi @ViktorKlang, is it possible the future in the output on cancellable is not the interesting one? I mean did you mean val f = Future(fun(p.future)) ; p tryCompleteWith fGambell
@FranciscoLópez-Sancho I'm not sure I understand what you mean—could you clarify a bit?Pica
Sorry @ViktorKlang, of course. I kind of feel that the future one wants back from the cancellabeFuture is not the future from the promise val f = p.future as this future is only used to trigger the cancellation. From inside cancellableFuture. on while(!future.isCompleted) continueCalculation. This future is essential to stop the calculation but the calculation itself is wrapped in other future p tryCompleteWith Future(fun(f)). And this Future(fun(f)) is the one that we may want to get out of the method cancellableFuture maybe to add a callback to or something else?Gambell
@FranciscoLópez-Sancho In the code, the Future returned will contain the result of the computation, or a CancellationException.Pica
awesome! :) Thank you for letting me know @ViktorKlangGambell
@FranciscoLópez-Sancho you're most welcome—I'm happy that so many developers use Future & Promise. :)Pica
U
12

Here is the interruptable version of Victor's code per his comments (Victor, please correct me if I misinterpreted).

object CancellableFuture extends App {

  def interruptableFuture[T](fun: () => T)(implicit ex: ExecutionContext): (Future[T], () => Boolean) = {
    val p = Promise[T]()
    val f = p.future
    val aref = new AtomicReference[Thread](null)
    p tryCompleteWith Future {
      val thread = Thread.currentThread
      aref.synchronized { aref.set(thread) }
      try fun() finally {
        val wasInterrupted = (aref.synchronized { aref getAndSet null }) ne thread
        //Deal with interrupted flag of this thread in desired
      }
    }

    (f, () => {
      aref.synchronized { Option(aref getAndSet null) foreach { _.interrupt() } }
      p.tryFailure(new CancellationException)
    })
  }

  val (f, cancel) = interruptableFuture[Int] { () =>
    val latch = new CountDownLatch(1)

    latch.await(5, TimeUnit.SECONDS)    // Blocks for 5 sec, is interruptable
    println("latch timed out")

    42  // Completed
  }

  f.onFailure { case ex => println(ex.getClass) }
  f.onSuccess { case i => println(i) }

  Thread.sleep(6000)   // Set to less than 5000 to cancel

  val wasCancelled = cancel()

  println("wasCancelled: " + wasCancelled)
}

With Thread.sleep(6000) the output is:

latch timed out
42
wasCancelled: false

With Thread.sleep(1000) the output is:

wasCancelled: true
class java.util.concurrent.CancellationException
Ultrastructure answered 17/4, 2013 at 2:49 Comment(4)
Thanks. That's definitely cleaner.Ultrastructure
Updated per Victor's gist.Ultrastructure
Why isn't the answer by @ViktorKlang updated to reflect his own comments and the gist?! It appears two answers compete with each other while there's a single author :(Sauveur
They are two different variations, they don't compete with each other. Viktor's uses a cancel() method to cancel the operation and mine uses Thread.interrupt.Ultrastructure
A
6

Twitter's futures implement cancellation. Have a look here:

https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Future.scala

Line 563 shows the abstract method responsible for this. Scala's futures currently do not support cancellation.

Aduwa answered 16/4, 2013 at 1:20 Comment(2)
Does it still? You linked to master, so the code has shifted since then.Toothbrush
Oh dear... It seems that raise might have replaced this functionality: twitter.github.io/util/docs/com/twitter/util/…Aduwa
K
2

You can use Monix library instead of Future

https://monix.io

Kiln answered 26/5, 2018 at 1:19 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.