Find out whether celery task exists
Asked Answered
S

9

55

Is it possible to find out whether a task with a certain task id exists? When I try to get the status, I will always get pending.

>>> AsyncResult('...').status
'PENDING'

I want to know whether a given task id is a real celery task id and not a random string. I want different results depending on whether there is a valid task for a certain id.

There may have been a valid task in the past with the same id but the results may have been deleted from the backend.

Superheterodyne answered 22/3, 2012 at 14:21 Comment(0)
M
40

Celery does not write a state when the task is sent, this is partly an optimization (see the documentation).

If you really need it, it's simple to add:

from celery import current_app
# `after_task_publish` is available in celery 3.1+
# for older versions use the deprecated `task_sent` signal
from celery.signals import after_task_publish

# when using celery versions older than 4.0, use body instead of headers

@after_task_publish.connect
def update_sent_state(sender=None, headers=None, **kwargs):
    # the task may not exist if sent using `send_task` which
    # sends tasks by name, so fall back to the default result backend
    # if that is the case.
    task = current_app.tasks.get(sender)
    backend = task.backend if task else current_app.backend
 
    backend.store_result(headers['id'], None, "SENT")

Then you can test for the PENDING state to detect that a task has not (seemingly) been sent:

>>> result.state != "PENDING"
Midinette answered 10/4, 2012 at 13:4 Comment(8)
It's worth mentioning that purging the queue does not remove the task meta (at least when using Redis as a backend). Therefore this method cannot be used reliably to determine if the task still exists.Debbradebby
Do I just have to add this code snippet to my existing tasks? I have them in "tasks.py" module. Further, the "result.state" will only work with "AsyncResult('...').status" not when I reference the state of the sent task or? @sleepycal: Would you then recommend to use RabbitMQ rather then Redis?Elwin
Currently, the above solution works with a bug. To fix it change body to headers. This may be of some help docs.celeryproject.org/en/latest/internals/…Undaunted
Two notes: 1) if the task is not picked up by a worker, update_sent_state will block the task. You can try it locally by not starting any celery worker. 2) you can put this snippet directly where you configure you celery app, it will effect all tasks.Anfractuous
@Anfractuous what do you mean by "block the task"? If I start no celery worker, I exit update_sent_state normally. Then when I start the worker, the task is done normally. I don't see what's different from when not using this snippet.Poppy
@Midinette What are the shortcomings of this? The answer says not doing it is an optim, but it looks like the link meant to explain this has changed (it was latest a while ago...) because I don't think the content is directly related.Poppy
There can be a race condition when also using task_track_started. From my tests it looks like if the task queue is empty, the task it started (and its status set to STARTED) right away before this callback is executed, at least before store_result actually set the status to 'SENT'. Then the status is set to "SENT" and the "STARTED" information is lost.Poppy
@Jérôme , you're a lifesaver, we copy-pasted this and didn't understand why some of our tasks stay stuck on 'SENT' forever. In practice they were getting updates to 'SUCCESS' and then to 'SENT'.Minim
P
10

AsyncResult.state returns PENDING in case of unknown task ids.

PENDING

Task is waiting for execution or unknown. Any task id that is not known is implied to be in the pending state.

http://docs.celeryproject.org/en/latest/userguide/tasks.html#pending

You can provide custom task ids if you need to distinguish unknown ids from existing ones:

>>> from tasks import add
>>> from celery.utils import uuid
>>> r = add.apply_async(args=[1, 2], task_id="celery-task-id-"+uuid())
>>> id = r.task_id
>>> id
'celery-task-id-b774c3f9-5280-4ebe-a770-14a6977090cd'
>>> if not "blubb".startswith("celery-task-id-"): print "Unknown task id"
... 
Unknown task id
>>> if not id.startswith("celery-task-id-"): print "Unknown task id"
... 
Paestum answered 9/4, 2012 at 13:56 Comment(3)
The problem is that I only have an id. Every id was once a valid id but some are not any more because the results were deleted from the backend. So I'll always have an id that starts with celery-task-id- but a task could still be invalid.Superheterodyne
In that case you should track id history externally. celery backends don't guarantee to keep all results forever. For example amqp backend can be queried only once.Paestum
@0x00mh: the problem is that having a task id, how can I tell if task is really PENDING or has been deleted from the backend (perhaps because I set celery to forget about it after some time)?Cedrickceevah
W
3

Right now I'm using following scheme:

  1. Get task id.
  2. Set to memcache key like 'task_%s' % task.id message 'Started'.
  3. Pass task id to client.
  4. Now from client I can monitor task status(set from task messages to memcache).
  5. From task on ready - set to memcache key message 'Ready'.
  6. From client on task ready - start special task that will delete key from memcache and do necessary cleaning actions.
Whiplash answered 10/4, 2012 at 12:40 Comment(1)
That was the way I wanted to do it but it seemed not to be the clean way of doing it.Superheterodyne
D
0

You need to call .get() on the AsyncTask object you create to actually fetch the result from the backend.

See the Celery FAQ.


To further clarify on my answer.

Any string is technically a valid ID, there is no way to validate the task ID. The only way to find out if a task exists is to ask the backend if it knows about it and to do that you must use .get().

This introduces the problem that .get() blocks when the backend doesn't have any information about the task ID you supplied, this is by design to allow you to start a task and then wait for its completion.

In the case of the original question I'm going to assume that the OP wants to get the state of a previously completed task. To do that you can pass a very small timeout and catch timeout errors:

from celery.exceptions import TimeoutError
try:
    # fetch the result from the backend
    # your backend must be fast enough to return
    # results within 100ms (0.1 seconds)
    result = AsyncResult('blubb').get(timeout=0.1)
except TimeoutError:
    result = None

if result:
    print "Result exists; state=%s" % (result.state,)
else:
    print "Result does not exist"

It should go without saying that this only work if your backend is storing results, if it's not there's no way to know if a task ID is valid or not because nothing is keeping a record of them.


Even more clarification.

What you want to do cannot be accomplished using the AMQP backend because it does not store results, it forwards them.

My suggestion would be to switch to a database backend so that the results are in a database that you can query outside of the existing celery modules. If no tasks exist in the result database you can assume the ID is invalid.

Dermatoplasty answered 5/4, 2012 at 20:23 Comment(7)
.get() will block until the system receives result. In case of not-existent ID this will just lock the application. You can pass a timeout argument but you are still unable to determine if the task-id is wrongUlphiah
Right, you need to pass a timeout value and catch the timeout error. That's the only way to determine if a task id is "valid" according to your backend. Any id is technically "valid" but only ID's your backend knows about will actually return any data.Dermatoplasty
My tasks normally last about 30 seconds. So that's no option, right?Superheterodyne
You want to get info about the task before it has finished, but from another process than the one that created the task. Basically so you can check if something is running? Is that correct?Dermatoplasty
This is a useful answer given its clarification that .get() will sometimes never return, without a timeout parameter. The other answers about storing task state outside Celery are more correct since brokers don't store data forever. However, switching to a database as broker is not such a good idea (such backends were intended for testing only and don't support some Celery features).Eponym
The term 'backend' in much of this answer is better replaced by 'broker' or 'message broker' - confusingly, Celery uses the term broker to mean the message transport (e.g. RabbitMQ), which is separate from the task results backend used to store results of tasks. Databases are fine as task results backends, but a bad choice as the broker (experimental in Celery, and missing some features).Eponym
Using .get() is clearly not the answer: it will block (or timeout) if the task does exist but is taking a long time to run. What is asked (and what I'm looking for as well), is a way to know whether the task is 'somewhere in the pipe', and not lost. Because we are currently facing this issue of tasks being lost by Celery but it still returns 'PENDING' (after 2 months). This is very annoying.Canaan
S
0

So I have this idea:

import project.celery_tasks as tasks

def task_exist(task_id):
  found = False
  # tasks is my imported task module from celery
  # it is located under /project/project, where the settings.py file is located
  i = tasks.app.control.inspect()
  s = i.scheduled()
  for e in s:
    if task_id in s[e]:
      found = True
      break
  a = i.active()
  if not found:
    for e in a:
      if task_id in a[e]:
        found = True
        break
  r = i.reserved()
  if not found:
    for e in r:
      if task_id in r[e]:
        found = True
        break
  # if checking the status returns pending, yet we found it in any queues... it means it exists...
  # if it returns pending, yet we didn't find it on any of the queues... it doesn't exist
  return found

According to https://docs.celeryproject.org/en/stable/userguide/monitoring.html the different types of queue inspections are: active, scheduled, reserved, revoked, registered, stats, query_task,

so pick and choose as you please.

And there might be a better way to go about checking the queues for their tasks, but this should work for me, for now.

Spreadeagle answered 20/1, 2022 at 8:53 Comment(0)
K
0

maybe use redis direct is a good solution.

pool = redis.ConnectionPool(host=config.REDIS_HOST,
                            port=config.REDIS_PORT,
                            db=config.REDIS_DB,
                            password=config.REDIS_PASSWORD)
redis_client = Redis(connection_pool=pool)

def check_task_exist(id):
    for one in redis_client.lrange('celery', 0, -1):
        task_info = json.loads(one.decode())
        if task_info['headers']['id'] == id:
            return True
    return False
Kairouan answered 19/5, 2023 at 4:48 Comment(0)
G
0

I found a way to check and it's working for me:

def check_task_exists(task_id):
inspector = app.control.inspect()
active_tasks = inspector.active()

# Check active tasks
if active_tasks:
    for worker, tasks in active_tasks.items():
        for task in tasks:
            if task['id'] == task_id:
                return True

# Check scheduled tasks
scheduled_tasks = inspector.scheduled()

if scheduled_tasks:
    for worker, tasks in scheduled_tasks.items():
        if task_id in tasks:
            return True

# Check reserved tasks
reserved_tasks = inspector.reserved()

if reserved_tasks:
    for worker, tasks in reserved_tasks.items():
        if task_id in tasks:
            return True

# Task not found
return False
Greco answered 13/6, 2023 at 10:59 Comment(0)
M
-4

Try

AsyncResult('blubb').state

that may work.

It should return something different.

Mannino answered 22/3, 2012 at 14:23 Comment(2)
I want to get different results depending on whether the task id is or has been a real task id. The problem is that I'll always get PENDING even if I use a fake id like blubb.Superheterodyne
.status is a deprecated alias of attribute stateUlphiah
R
-4

Please correct me if i'm wrong.

if built_in_status_check(task_id) == 'pending'
   if registry_exists(task_id) == true
      print 'Pending'
   else
      print 'Task does not exist'
Rounded answered 7/4, 2012 at 2:24 Comment(3)
What are built_in_status_check and registry_exists? How would you implement this?Superheterodyne
Well, I came to know that there are 6 Task States (PENDING, STARTED, SUCCESS, FAILURE, RETRY and REVOKED). So, i thought we could have a code to check whether the task is in 'PENDING' or not. And if it is in 'PENDING' state than we could check that particular task with registry entries for existence.Rounded
No, I know that the state is pending but I don't know the reason why it is pending. I am looking for a smart registry_exists.Superheterodyne

© 2022 - 2025 — McMap. All rights reserved.