How to cancel Future in Scala?
Asked Answered
C

4

58

Java Future has cancel method, which can interrupt the thread, which runs the Future task. For example, if I wrap an interruptible blocking call in a Java Future I can interrupt it later.

Scala Future provides no cancel method. Suppose I wrap an interruptible blocking call in a Scala Future. How can I interrupt it?

Customer answered 15/4, 2013 at 7:29 Comment(0)
L
34

This is not yet a part of the Futures API, but may be added as an extension in the future.

As a workaround, you could use the firstCompletedOf to wrap 2 futures - the future you want to cancel and a future that comes from a custom Promise. You could then cancel the thus created future by failing the promise:

def cancellable[T](f: Future[T])(customCode: => Unit): (() => Unit, Future[T]) = {
  val p = Promise[T]
  val first = Future firstCompletedOf Seq(p.future, f)
  val cancellation: () => Unit = {
    () =>
      first onFailure { case e => customCode}
      p failure new Exception
  }
  (cancellation, first)
}

Now you can call this on any future to obtain a "cancellable wrapper". Example use-case:

val f = callReturningAFuture()
val (cancel, f1) = cancellable(f) {
  cancelTheCallReturningAFuture()
}

// somewhere else in code
if (condition) cancel() else println(Await.result(f1))

EDIT:

For a detailed discussion on cancellation, see Chapter 4 in the Learning concurrent programming in Scala book.

Lineament answered 15/4, 2013 at 7:50 Comment(11)
Yes, Promises are the producing end of futures in scala, so that's where you control the outcome. Some links: scala-lang.org/api/current/index.html#scala.concurrent.Promise, docs.scala-lang.org/overviews/core/futures.html#promisesTrencherman
No, you have to add this bit of custom cancellation/interruption logic in the customCode above.Lineament
Thanks. Why does not the standard Scala library include this cancel ?Customer
Unfortunately this is not 100% reliable. Indeed, between the time you call cancel and the time when customCode actually stops the body of the future (say by example that customCode sets a boolean flag that is checked by the future's body to know whether to abort), anything might happen. In particular the future's body might start to execute. The end result: while the future returned by cancellable says it was cancelled, the futures's body actually executed. That's a real problem as soon as the future's body performs any side effect. This makes using your code actually subtly dangerous.Elevated
What you can do instead then is to add a callback to the cancellable future to interrupt the thread - the idea is to interrupt the thread only after the future is cancelled. I'll update the answer.Lineament
This won't do it either. The problem is still the same: between the moment that you decided that you want to cancel the future (that is, in your example condition evaluated to true) and the moment you actually interrupt the thread, it might be too late and the original future's body might already have executed.Elevated
True - you cannot have the cancellation of the future be atomically done with thread interruption - the API and the implementation do not allow this. This is a workaround that can help in some situations, though.Lineament
By the way, if the call is interruptible, doesn't the library you're using to make that call already return some kind of future which you can cancel? (Specifically, I'm thinking of the few things I've heard about Scala-IO)Newhall
Providing Future.cancel is not a good idea due to the reasons discussed; another problem is that a Future is a shared read-only handle and as such it should not provide methods which interfere with other readers. What you can do is to pass a Future into the code which you want to run and have that code check the Future periodically, then you have a principled way of interrupting the computation by completing the corresponding Promise.Coahuila
Future.cancel would be really helpful. I recently ran into a problem while I created futures connecting to different db for collecting stats. Some of the dbs had the problem that hung the connection, which made the future just run forever. Since the stat collection ran periodically, so guess what, my application ended up eating up all cpu's that are available to it. I realized the problem, and tried to find a way to timeout the future before database can be fixed, but so far no good answer can be found :(Lashley
but it will not cancel original futureUball
C
11

I haven't tested this, but this expands on the answer of Pablo Francisco Pérez Hidalgo. Instead of blocking waiting for the java Future, we use an intermediate Promise instead.

import java.util.concurrent.{Callable, FutureTask}
import scala.concurrent.{ExecutionContext, Promise}
import scala.util.Try

class Cancellable[T](executionContext: ExecutionContext, todo: => T) {
  private val promise = Promise[T]()

  def future = promise.future

  private val jf: FutureTask[T] = new FutureTask[T](
    new Callable[T] {
      override def call(): T = todo
    }
  ) {
    override def done() = promise.complete(Try(get()))
  }

  def cancel(): Unit = jf.cancel(true)

  executionContext.execute(jf)
}

object Cancellable {
  def apply[T](todo: => T)(implicit executionContext: ExecutionContext): Cancellable[T] =
    new Cancellable[T](executionContext, todo)
}
Carley answered 11/10, 2016 at 20:45 Comment(4)
Great optimisation! More Java style code but thread use reduction is worth of it. I love Stack Overflow synergiesRodl
@PabloFranciscoPérezHidalgo Do you mind if I publish an adaptation of this as a library? You can find my adaptation hereVilayet
@Vilayet Consider it open source. So, feel free! It would be nice if the authorship of the idea was collected in the repo :D (You could include my twitter handle "pfcoperez" or stack overflow profile link - as well as nightingale's - )Rodl
@Vilayet By the way, the initial version is already in a public repo github.com/Stratio/common-utils/blob/master/src/main/scala/com/… , you can commit the updated one there too ;-)Rodl
D
8

By cancelling I guess you would like to violently interrupt the future.

Found this segment of code: https://gist.github.com/viktorklang/5409467

Did a few tests and seems to work fine!

Enjoy :)

Deannedeans answered 23/10, 2013 at 10:40 Comment(6)
Do you have your code segment using this function? I'm having some trouble understanding how to use it.Gene
just call the function. it returns two values, the future and the cancellor (a function which you can call to cancel the running future). There is nothing more to it really. You don't have to understand it in order to use it. Just copy paste it. Hope this helps a little bitDeannedeans
The answer here might help a little with understanding how to use this: https://mcmap.net/q/752713/-cancellation-with-future-and-promise-in-scalaUse
Futures are really useful when you combine them with flatMap, map, filter, etc. Cancelation approach above does not work when you combine futures. For example val f = Future { // blocking interruptable computation }.map(res => { // one more blocking interruptable computation }) How can I cancel future f?Catarina
In general I agree that cancelling is possible as shown, but NOT RECOMMENDEDDeannedeans
This is the only way that actually works! All other examples mentioned here don't work (the Future is not being terminated). Yes, interrupting threads is not a good idea, but it's the other question. See proof here: pastebin.com/5HTGkJGtCanzone
M
4

I think it is possible to reduce the complexity of the implementations provided by making use of the Java 7 Future interface and its implementations.

Cancellable can build a Java future which is the one to be cancelled by its cancel method. Another future can wait for its completion thus becoming the observable interface which is itself immutable in state:

 class Cancellable[T](executionContext: ExecutionContext, todo: => T) {

   private val jf: FutureTask[T] = new FutureTask[T](
     new Callable[T] {
       override def call(): T = todo
     }
   )

   executionContext.execute(jf)

   implicit val _: ExecutionContext = executionContext

   val future: Future[T] = Future {
     jf.get
   }

   def cancel(): Unit = jf.cancel(true)

 }

 object Cancellable {
   def apply[T](todo: => T)(implicit executionContext: ExecutionContext): Cancellable[T] =
     new Cancellable[T](executionContext, todo)
 }
Mattos answered 1/3, 2016 at 13:18 Comment(8)
I haven't tested this yet, but it looks great. But maybe jf.get call should be wrapped into scala.concurrent.blocking block for good measure.Carley
@Carley Thanks! I use it in production github.com/Stratio/Common-utils/blob/master/src/main/scala/com/… at github.com/Stratio/Crossdata/blob/…. jf.get is being called within a block passed to a Future constructor, IMHO there is no danger from potential blocking indeed, is it?Rodl
@Carley I've checked this question #19681889 and it (blocking) seems it might apply. I'll check it out, thanks for your suggestion :)Rodl
Yeah, as the link you found says too, I think it's exactly the case to use the blocking hint so that underlying thread pool temporarily increases the number of workers. You're welcome!Carley
@Carley That's great! I wasn't concerned about the number of executors because, given the very specific characteristics of the code where I use it, I had too use a custom ExecutionContext which provides a whole new thread for each execution: class ProlificExecutor extends Executor { override def execute(command: Runnable): Unit = new Thread(command) start }Rodl
Oh, yeah, that would obviate this particular concern, indeed. Well, usefulness of the blocking hint is limited as it is, being applicable only to the default global ExecutionContext, so...Carley
But I wonder if we can have the best of two words and avoid always stealing a thread from the pool just so that we could wait on the java Future even if nobody called get()/Await.result() on the outer scala Future. If instead we use a Promise and complete it from FutureTask.done()...Carley
Check out my answer for implementation of this idea. I've intentionally kept it so that it remains a drop-in replacement for your class, but now it should use half as much threads. Hope it works and will be of use!Carley

© 2022 - 2024 — McMap. All rights reserved.