Celery stop execution of a chain
Asked Answered
M

3

26

I have a check_orders task that's executed periodically. It makes a group of tasks so that I can time how long executing the tasks took, and perform something when they're all done (this is the purpose of res.join [1] and grouped_subs) The tasks that are grouped are pairs of chained tasks.

What I want is for when the first task doesn't meet a condition (fails) don't execute the second task in the chain. I can't figure this out for the life of me and I feel this is pretty basic functionality for a job queue manager. When I try the things I have commented out after [2] (raising exceptions, removing callbacks)... we get stuck on the join() in check_orders for some reason (it breaks the group). I've tried setting ignore_result to False as well for all these tasks but it still doesn't work.

@task(ignore_result=True)
def check_orders():
    # check all the orders and send out appropriate notifications
    grouped_subs = []

    for thingy in things:
       ...

        grouped_subs.append(chain(is_room_open.subtask((args_sub_1, )), 
                        notify.subtask((args_sub_2, ), immutable=True)))

    res = group(grouped_subs).apply_async()

    res.join()         #[1]
    logger.info('Done checking orders at %s' % current_task.request.id))

@task(ignore_result=True)
def is_room_open(args_sub_1):
    #something time consuming
    if http_req_and_parse(args_sub_1):
        # go on and do the notify task
        return True
    else:
        # [2]
        # STOP THE CHAIN SOMEHOW! Don't execute the rest of the chain, how?
        # None of the following things work:
        # is_room_open.update_state(state='FAILURE')
        # raise celery.exceptions.Ignore()
        # raise Exception('spam', 'eggs')
        # current_task.request.callbacks[:] = []

@task(ignore_result=True)
def notify(args_sub_2):
    # something else time consuming, only do this if the first part of the chain 
    # passed a test (the chained tasks before this were 'successful'
    notify_user(args_sub_2)
Maggio answered 4/7, 2013 at 3:27 Comment(2)
For Celery 4.0 you can use this answer - https://mcmap.net/q/536903/-celery-clean-way-of-revoking-the-entire-chain-from-within-a-taskDebtor
Does this answer your question? Celery: clean way of revoking the entire chain from within a taskDebtor
H
21

In my opinion this is a common use-case that doesn't get enough love in the documentation.

Assuming you want to abort a chain mid-way while still reporting SUCCESS as status of the completed tasks, and not sending any error log or whatnot (else you can just raise an exception) then a way to accomplish this is:

@app.task(bind=True)  # Note that we need bind=True for self to work
def task1(self, other_args):
    #do_stuff
    if end_chain:
        self.request.callbacks = None
        return
    #Other stuff to do if end_chain is False

So in your example:

@app.task(ignore_result=True, bind=True)
def is_room_open(self, args_sub_1):
    #something time consuming
    if http_req_and_parse(args_sub_1):
        # go on and do the notify task
        return True
    else:
        self.request.callbacks = None

Will work. Note that instead of ignore_result=True and subtask() you can use the shortcut .si() as stated by @abbasov-alexander

Edited to work with EAGER mode, as suggested by @PhilipGarnero in the comments.

Hypomania answered 14/1, 2014 at 5:23 Comment(4)
If you are running tasks in EAGER mode, the above will stop the task. I replaced self.request.callbacks[:] = [] by self.request.callbacks = None and it's now working in both cases.Inconsumable
If it works in both cases, lets suggest that then. Thanks for commenting to improve the answer :)Hypomania
Apparently It doesn't work anymore for Celery 4.0, but self.request.chain = None does. #23794428Clothier
Both self.request.callbacks = None and self.request.chain = None did not work for me. I worked around the issue by returning a boolean indicating whether it should be aborted, and accepted that as argument in the follow-up tasks of the chain.Vasques
S
10

It's unbelievable as a so common case isn't treated in any official documentation. I had to cope with the same issue (but using shared_tasks with bind option, so we have visibility of self object), so I wrote a custom decorator that handles automatically the revocation:

def revoke_chain_authority(a_shared_task):
    """
    @see: https://gist.github.com/bloudermilk/2173940
    @param a_shared_task: a @shared_task(bind=True) celery function.
    @return:
    """
    @wraps(a_shared_task)
    def inner(self, *args, **kwargs):
        try:
            return a_shared_task(self, *args, **kwargs)
        except RevokeChainRequested, e:
            # Drop subsequent tasks in chain (if not EAGER mode)
            if self.request.callbacks:
                self.request.callbacks[:] = []
            return e.return_value

    return inner

You can use it as follows:

@shared_task(bind=True)
@revoke_chain_authority
def apply_fetching_decision(self, latitude, longitude):
    #...

    if condition:
        raise RevokeChainRequested(False)

See the full explanation here. Hope it helps!

Sherrylsherurd answered 1/7, 2014 at 20:45 Comment(1)
It seems that now the callbacks variable is a tuple, so it returns an error when trying to perform that operation: self.request.callbacks[:] = [] ''' line break ''' TypeError: 'tuple' object does not support item assignmentDeoxidize
E
3

Firstly, it seems if into the function exists exception ignore_result don't help you.

Secondly, you use immutable=True It means that next function (in our case is notify) does not take additional arguments. You should use notify.subtask((args_sub_2, ), immutable=False) of course if it suitable for your decision.

Third, you can use shortcuts:

notify.si(args_sub_2) instead notify.subtask((args_sub_2, ), immutable=True)

and

is_room_open.s(args_sub_1) instead is_room_open.subtask((args_sub_1, ))

Try use it code:

@task
def check_orders():
    # check all the orders and send out appropriate notifications
    grouped_subs = []

    for thingy in things:
       ...

        grouped_subs.append(chain(is_room_open.s(args_sub_1), 
                                  notify.s(args_sub_2)))

    res = group(grouped_subs).apply_async()

    res.join()         #[1]
    logger.info('Done checking orders at %s' % current_task.request.id))

@task
def is_room_open(args_sub_1):
    #something time consuming
    if http_req_and_parse(args_sub_1):
        # go on and do the notify task
        return True
    else:
        # [2]
        # STOP THE CHAIN SOMEHOW! Don't execute the rest of the chain, how?
        # None of the following things work:
        # is_room_open.update_state(state='FAILURE')
        # raise celery.exceptions.Ignore()
        # raise Exception('spam', 'eggs')
        # current_task.request.callbacks[:] = []
        return False

@task
def notify(result, args_sub_2):
    if result:
        # something else time consuming, only do this if the first part of the chain 
        # passed a test (the chained tasks before this were 'successful'
        notify_user(args_sub_2)
        return True
    return False

If you want catch exceptions you must use callback as so

is_room_open.s(args_sub_1, link_error=log_error.s())

from proj.celery import celery

@celery.task
def log_error(task_id):
    result = celery.AsyncResult(task_id)
    result.get(propagate=False)  # make sure result written.
    with open(os.path.join('/var/errors', task_id), 'a') as fh:
        fh.write('--\n\n%s %s %s' % (
            task_id, result.result, result.traceback))
Eductive answered 6/7, 2013 at 14:18 Comment(3)
Thank you for the tips about shortcuts. Although this would work, it doesn't solve my problem. I want the second task to not ever execute if the first one fails. This solution still has the overhead of starting the second task every time independent of the results of the first task. I want to stop execution of the chain.Maggio
I understood you. If task raised an exception execution of the chain will stop. Its behavior by default. You don't need to search special decision for it.Eductive
@Alexander, raising the exception is NOT working correctly. "When I try the things I have commented out after [2] (raising exceptions, removing callbacks)... we get stuck on the join() in check_orders for some reason (it breaks the group)."Maggio

© 2022 - 2024 — McMap. All rights reserved.