Celery: clean way of revoking the entire chain from within a task
Asked Answered
P

2

11

My question is probably pretty basic but still I can't get a solution in the official doc. I have defined a Celery chain inside my Django application, performing a set of tasks dependent from eanch other:

chain(  tasks.apply_fetching_decision.s(x, y),
        tasks.retrieve_public_info.s(z, x, y),
        tasks.public_adapter.s())()

Obviously the second and the third tasks need the output of the parent, that's why I used a chain.

Now the question: I need to programmatically revoke the 2nd and the 3rd tasks if a test condition in the 1st task fails. How to do it in a clean way? I know I can revoke the tasks of a chain from within the method where I have defined the chain (see thisquestion and this doc) but inside the first task I have no visibility of subsequent tasks nor of the chain itself.

Temporary solution

My current solution is to skip the computation inside the subsequent tasks based on result of the previous task:

@shared_task
def retrieve_public_info(result, x, y):
   if not result:
      return []
   ...

@shared_task
def public_adapter(result, z, x, y):
   for r in result:
       ...

But this "workaround" has some flaw:

  • Adds unnecessary logic to each task (based on predecessor's result), compromising reuse
  • Still executes the subsequent tasks, with all the resulting overhead

I haven't played too much with passing references of the chain to tasks for fear of messing up things. I admit also I haven't tried Exception-throwing approach, because I think that the choice of not proceeding through the chain can be a functional (thus non exceptional) scenario...

Thanks for helping!

Parole answered 21/5, 2014 at 21:15 Comment(3)
possible duplicate of Celery stop execution of a chainComputation
Rather than passing only the value of the result, you can pass a tuple of the result and a success message. In higher terms, this can be implemented as an Error Monad or a Maybe Monad: haskell.org/haskellwiki/All_About_Monads#The_Error_monad en.wikipedia.org/wiki/…Chantey
This doesn't prevent the N subsequent chain tasks to be executed. What I am trying to avoid is exactly the overhead of tasks to be called and executed in vain.Parole
P
15

I think I found the answer to this issue: this seems the right way to proceed, indeed. I wonder why such common scenario is not documented anywhere, though.

For completeness I post the basic code snapshot:

@app.task(bind=True)  # Note that we need bind=True for self to work
def task1(self, other_args):
    #do_stuff
    if end_chain:
        self.request.callbacks[:] = []
    ....

Update

I implemented a more elegant way to cope with the issue and I want to share it with you. I am using a decorator called revoke_chain_authority, so that it can revoke automatically the chain without rewriting the code I previously described.

from functools import wraps

class RevokeChainRequested(Exception):
    def __init__(self, return_value):
        Exception.__init__(self, "")

        # Now for your custom code...
        self.return_value = return_value


def revoke_chain_authority(a_shared_task):
    """
    @see: https://gist.github.com/bloudermilk/2173940
    @param a_shared_task: a @shared_task(bind=True) celery function.
    @return:
    """
    @wraps(a_shared_task)
    def inner(self, *args, **kwargs):
        try:
            return a_shared_task(self, *args, **kwargs)
        except RevokeChainRequested, e:
            # Drop subsequent tasks in chain (if not EAGER mode)
            if self.request.callbacks:
                self.request.callbacks[:] = []
            return e.return_value

    return inner

This decorator can be used on a shared task as follows:

@shared_task(bind=True)
@revoke_chain_authority
def apply_fetching_decision(self, latitude, longitude):
    #...

    if condition:
        raise RevokeChainRequested(False)

Please note the use of @wraps. It is necessary to preserve the signature of the original function, otherwise this latter will be lost and celery will make a mess at calling the right wrapped task (e.g. it will call always the first registered function instead of the right one)

Parole answered 22/5, 2014 at 1:22 Comment(0)
H
18

As of Celery 4.0, what I found to be working is to remove the remaining tasks from the current task instance's request using the statement:

self.request.chain = None

Let's say you have a chain of tasks a.s() | b.s() | c.s(). You can only access the self variable inside a task if you bind the task by passing bind=True as argument to the tasks' decorator.

@app.task(name='main.a', bind=True):
def a(self):
  if something_happened:
    self.request.chain = None

If something_happened is truthy, b and c wouldn't be executed.

Hunch answered 13/11, 2016 at 23:31 Comment(6)
and how it should be used? For example: I have a ID, which returned by chain_task.apply_async(), what can I do to revoke this task?Hellenistic
@Hellenistic Just edited the answer. I don't know how to revoke a task by its id. This solution is useful if you need to stop the chain conditionally inside some task.Hunch
oh, I got it thanks. Btw, I found approach that not elegant, but helped for me: https://mcmap.net/q/540732/-get-progress-from-async-python-celery-chain-by-chain-idHellenistic
I wonder how “official” this solution is; feels somewhat hacky to me. While it seems to work I can’t find documentation that this is a supported way of aborting a chain of tasks...Krahling
If I have this chain in a group and I set self.request.chain = None, I only want that to stop this chain, but not affect the rest of the parallel group. Will that statement correctly only affect the current chain?Thordis
@Thordis I also have a chain inside another chain and in my case it stop the two chains.Favouritism
P
15

I think I found the answer to this issue: this seems the right way to proceed, indeed. I wonder why such common scenario is not documented anywhere, though.

For completeness I post the basic code snapshot:

@app.task(bind=True)  # Note that we need bind=True for self to work
def task1(self, other_args):
    #do_stuff
    if end_chain:
        self.request.callbacks[:] = []
    ....

Update

I implemented a more elegant way to cope with the issue and I want to share it with you. I am using a decorator called revoke_chain_authority, so that it can revoke automatically the chain without rewriting the code I previously described.

from functools import wraps

class RevokeChainRequested(Exception):
    def __init__(self, return_value):
        Exception.__init__(self, "")

        # Now for your custom code...
        self.return_value = return_value


def revoke_chain_authority(a_shared_task):
    """
    @see: https://gist.github.com/bloudermilk/2173940
    @param a_shared_task: a @shared_task(bind=True) celery function.
    @return:
    """
    @wraps(a_shared_task)
    def inner(self, *args, **kwargs):
        try:
            return a_shared_task(self, *args, **kwargs)
        except RevokeChainRequested, e:
            # Drop subsequent tasks in chain (if not EAGER mode)
            if self.request.callbacks:
                self.request.callbacks[:] = []
            return e.return_value

    return inner

This decorator can be used on a shared task as follows:

@shared_task(bind=True)
@revoke_chain_authority
def apply_fetching_decision(self, latitude, longitude):
    #...

    if condition:
        raise RevokeChainRequested(False)

Please note the use of @wraps. It is necessary to preserve the signature of the original function, otherwise this latter will be lost and celery will make a mess at calling the right wrapped task (e.g. it will call always the first registered function instead of the right one)

Parole answered 22/5, 2014 at 1:22 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.