RuntimeError: Never call result.get() within a task Celery
Asked Answered
O

3

21

I am using celery to send a task to remote server and trying to get the result back. The state of task is constantly updated using update_state method on remote server.

I am sending task using

app.send_task('task_name')

getting results of celery task is a blocking call and i don't want my django app to wait for result and timeout.

So i tried running another celery task for getting results.

@app.task(ignore_result=True)
def catpure_res(task_id):
    task_obj = AsyncResult(task_id)
    task_obj.get(on_message=on_msg)

But it results in the error below.

Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 367, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 622, in __protected_call__
    return self.run(*args, **kwargs)
  File "/home/arpit/project/appname/tasks/results.py", line 42, in catpure_res
    task_obj.get(on_message=on_msg)
  File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 168, in get
    assert_will_not_block()
  File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 44, in assert_will_not_block
    raise RuntimeError(E_WOULDBLOCK)
RuntimeError: Never call result.get() within a task!
See http://docs.celeryq.org/en/latest/userguide/tasks.html#task-synchronous-subtasks

Is there any workaround for this error. Do I have to run a daemon process for getting the results?

Orlene answered 3/8, 2017 at 17:15 Comment(0)
H
30

Use allow_join_result. See the snippet below.

@app.task(ignore_result=True)
def catpure_res(task_id):
    task_obj = AsyncResult(task_id)
    with allow_join_result():
        task_obj.get(on_message=on_msg)

Note: As mentioned in the other answers it can cause performance issue and even deadlock, but if your task is well written and doesn't cause unexpected errors than it should work like a charm.

Hett answered 3/8, 2017 at 17:53 Comment(3)
exactly what i wanted. Thanks.Orlene
Where does "on_msg" come from in "task_obj.get(on_message=on_msg)" ?Teenager
import 'from celery.result import allow_join_result'Audryaudrye
P
5

As your title explain, calling get within a task is a bad practice and can lead to deadlock. instead, you can check for the task status and get it result whenever it's ready:

result = catpure_res.AsyncResult(task_id, app=app)
    if result.ready():
        return result.get()

    return result.state

You can wrap the above snippet within a function and request for it every x seconds.

EDIT: regard your comment:

  • You can get the result.state instead, and use the retry mechanism with countdown until the task result.state == SUCCESS.

  • You can add celery beat to run periodic task that check if the primary task ends.

  • Note that using such heavy task (of long duration) is also a bad practice. consider to break it apart into a small tasks and use canvas to combine them.

Placable answered 3/8, 2017 at 17:35 Comment(3)
pinging it every seconds will take computational cost. Also the task is very heavy and can run for days it updates its states that which step is complete and what are meta information for it. Although your suggestion is good but does not seem to work in my caseOrlene
running beat and everything seems like a big setup. Also it is a single task it can not be broken apart. You can imagine it as a single function. See the answer aboveOrlene
I know Celery says it's bad practice, but I wholeheartedly disagree. Tasks shouldn't be different from subtasks. I've got scripts that might be called by themselves or as subtasks and a few subtasks that need to run subtasks, and my design would be much more complicated if I followed the guidelines.Wherry
L
1
from celery.result import allow_join_result
task_obj = send_task("do_something", [arg1, arg2, arg3])

with allow_join_result():
    def on_msg(*args, **kwargs):
        print(f"on_msg: {args}, {kwargs}")
    try:
        result = task_obj.get(on_msg=on_msg, timeout=timeout_s)
    except TimeoutError as exc:
        print("Timeout!")
Lonesome answered 15/3, 2023 at 19:36 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.