How to compose two parallel Tasks to cancel one task if another one fails?
Asked Answered
S

2

8

I would like to implement my asynchronous processing with scalaz.concurrent.Task. I need a function (Task[A], Task[B]) => Task[(A, B)] to return a new task that works as follows:

  • run Task[A] and Task[B] in parallel and wait for the results;
  • if one of the tasks fails then cancel the second one and wait until it terminates;
  • return the results of both tasks.

How would you implement such a function ?

Surplus answered 11/10, 2015 at 13:30 Comment(2)
There are several different things cancel could mean here. Do you just want the computation to fail fast? Then something like both on Nondeterminism will work. If you also want to avoid wasting cycles (or you want to undo the effects of the still-running computation) it'll be more complicated.Occipital
Yes, I just want the computation to fail fast for now.Surplus
O
3

As I mention above, if you don't care about actually stopping the non-failed computation, you can use Nondeterminism. For example:

import scalaz._, scalaz.Scalaz._, scalaz.concurrent._

def pairFailSlow[A, B](a: Task[A], b: Task[B]): Task[(A, B)] = a.tuple(b)

def pairFailFast[A, B](a: Task[A], b: Task[B]): Task[(A, B)] =
  Nondeterminism[Task].both(a, b)

val divByZero: Task[Int] = Task(1 / 0)
val waitALongTime: Task[String] = Task {
  Thread.sleep(10000)
  println("foo")
  "foo"
}

And then:

pairFailSlow(divByZero, waitALongTime).run // fails immediately
pairFailSlow(waitALongTime, divByZero).run // hangs while sleeping
pairFailFast(divByZero, waitALongTime).run // fails immediately
pairFailFast(waitALongTime, divByZero).run // fails immediately

In every case except the first the side effect in waitALongTime will happen. If you wanted to attempt to stop that computation, you'd need to use something like Task's runAsyncInterruptibly.

Occipital answered 11/10, 2015 at 18:50 Comment(0)
C
2

There is a weird conception among java developers that you should not cancel parallel tasks. They comminate Thread.stop() and mark it deprecated. Without Thread.stop() you could not really cancel future. All you could do is to send some signal to future, or modify some shared variable and make code inside future to check it periodically. So, all libraries that provides futures could suggest the only way to cancel future: do it cooperatively.

I'm facing the same problem now and is in the middle of writing my own library for futures that could be cancelled. There are some difficulties but they may be solved. You just could not call Thread.stop() in any arbitrary position. The thread may perform updating shared variables. Lock would be recalled normally, but update may be stopped half-way, e.g. updating only half of double value and so on. So I'm introducing some lock. If the thread is in guarded state, then it should be now killed by Thread.stop() but with sending specific message. The guarded state is considered always very fast to be waited for. All other time, in the middle of computation, thread may be safely stopped and replaced with new one.

So, the answer is that: you should not desire to cancel futures, otherwise you are heretic and no one in java community would lend you a willing hand. You should define your own executional context that could kill threads and you should write your own futures library to run upon this context

Contemporaneous answered 11/10, 2015 at 17:5 Comment(2)
Which language/environment is okay with non-cooperative thread cancellation? A quick search shows that neither C# (#14132108), C++11 (#12208184) or Python (#324472) is not.Quartile
normal c or c++. Of course it could not be done in in portable way as you pointer, but every OS has its own thread API calls that may be employed by c or c++ application. Being not portable is normal for c/c++ codeContemporaneous

© 2022 - 2024 — McMap. All rights reserved.