Detect whether Celery is Available/Running
Asked Answered
R

10

71

I'm using Celery to manage asynchronous tasks. Occasionally, however, the celery process goes down which causes none of the tasks to get executed. I would like to be able to check the status of celery and make sure everything is working fine, and if I detect any problems display an error message to the user. From the Celery Worker documentation it looks like I might be able to use ping or inspect for this, but ping feels hacky and it's not clear exactly how inspect is meant to be used (if inspect().registered() is empty?).

Any guidance on this would be appreciated. Basically what I'm looking for is a method like so:

def celery_is_alive():
    from celery.task.control import inspect
    return bool(inspect().registered()) # is this right??

EDIT: It doesn't even look like registered() is available on celery 2.3.3 (even though the 2.1 docs list it). Maybe ping is the right answer.

EDIT: Ping also doesn't appear to do what I thought it would do, so still not sure the answer here.

Resa answered 14/12, 2011 at 15:17 Comment(2)
Did the answer below not work for you? As someone who has a similar problem to solve, I would love some confirmation.Sistrunk
I know this is an old question, but could you elaborate on how ping isn't the answer? It seems like ping is exactly the right answer, a simple 'pong' response saying that the worker is alive.Lacrimator
M
65

Here's the code I've been using. celery.task.control.Inspect.stats() returns a dict containing lots of details about the currently available workers, None if there are no workers running, or raises an IOError if it can't connect to the message broker. I'm using RabbitMQ - it's possible that other messaging systems might behave slightly differently. This worked in Celery 2.3.x and 2.4.x; I'm not sure how far back it goes.

def get_celery_worker_status():
    ERROR_KEY = "ERROR"
    try:
        from celery.task.control import inspect
        insp = inspect()
        d = insp.stats()
        if not d:
            d = { ERROR_KEY: 'No running Celery workers were found.' }
    except IOError as e:
        from errno import errorcode
        msg = "Error connecting to the backend: " + str(e)
        if len(e.args) > 0 and errorcode.get(e.args[0]) == 'ECONNREFUSED':
            msg += ' Check that the RabbitMQ server is running.'
        d = { ERROR_KEY: msg }
    except ImportError as e:
        d = { ERROR_KEY: str(e)}
    return d
Mada answered 15/12, 2011 at 15:28 Comment(5)
I've discovered that the above adds two reply.celery.pidbox queues to rabbitmq every time it's run. This leads to an incremental increase in rabbitmq's memory usage.Sistrunk
just for completeness, you can use: sudo service celerybeatd status, to check if the scheduler is up.Umbilicate
This does not work for me. With Redis as broker, when redis is not available, and no celery worker is running, insp.stats() just blocks.Fettling
This is not working for me. I suppose the API of celery changed (now at version 4.2).Compassion
For 4.2 and above versions, you can take a look at #8507414 .Hysteria
C
25

From the documentation of celery 4.2:

from your_celery_app import app


def get_celery_worker_status():
    i = app.control.inspect()
    availability = i.ping()
    stats = i.stats()
    registered_tasks = i.registered()
    active_tasks = i.active()
    scheduled_tasks = i.scheduled()
    result = {
        'availability': availability,
        'stats': stats,
        'registered_tasks': registered_tasks,
        'active_tasks': active_tasks,
        'scheduled_tasks': scheduled_tasks
    }
    return result

of course you could/should improve the code with error handling...

Compassion answered 19/12, 2018 at 17:3 Comment(2)
To just check for availability there's also i.ping() which returns None on failure.Lacrimator
Thanks Tim. I added 'availability' to the functionCompassion
A
13

To check the same using command line in case celery is running as daemon,

  • Activate virtualenv and go to the dir where the 'app' is
  • Now run : celery -A [app_name] status
  • It will show if celery is up or not plus no. of nodes online

Source: http://michal.karzynski.pl/blog/2014/05/18/setting-up-an-asynchronous-task-queue-for-django-using-celery-redis/

Autumnautumnal answered 25/5, 2016 at 7:4 Comment(0)
L
7

The following worked for me:

import socket
from kombu import Connection

celery_broker_url = "amqp://localhost"

try:
    conn = Connection(celery_broker_url)
    conn.ensure_connection(max_retries=3)
except socket.error:
    raise RuntimeError("Failed to connect to RabbitMQ instance at {}".format(celery_broker_url))
Lombardy answered 15/3, 2016 at 1:26 Comment(2)
I'm pretty sure this will will succeed if rabbitmq is running regardless of the status of celery. But this is a good check to do if celery fails to know whether the failure is with rabbitmq or something else.Lacrimator
It testing with Redis as the backed this returns a valid connection object even if Redis is not running.Procure
L
6

One method to test if any worker is responding is to send out a 'ping' broadcast and return with a successful result on the first response.

from .celery import app  # the celery 'app' created in your project

def is_celery_working():
    result = app.control.broadcast('ping', reply=True, limit=1)
    return bool(result)  # True if at least one result

This broadcasts a 'ping' and will wait up to one second for responses. As soon as the first response comes in, it will return a result. If you want a False result faster, you can add a timeout argument to reduce how long it waits before giving up.

Lacrimator answered 25/3, 2019 at 18:8 Comment(0)
P
3

I found an elegant solution:

from .celery import app
try:
    app.broker_connection().ensure_connection(max_retries=3)
except Exception as ex:
    raise RuntimeError("Failed to connect to celery broker, {}".format(str(ex)))
Pasley answered 7/8, 2021 at 20:55 Comment(1)
As stated on the other answer like this, I think this only tells you the broker is working and not whether workers are running.Lacrimator
N
3

Run celery status to get the status.

When celery is running,

(venv) ubuntu@server1:~/project-dir$ celery status
->  celery@server1: OK

1 node online.

When no celery worker is running, you get the below information displayed in terminal.

(venv) ubuntu@server1:~/project-dir$ celery status
Error: No nodes replied within time constraint
Nab answered 17/11, 2022 at 10:21 Comment(0)
T
2

You can use ping method to check whether any worker (or specific worker) is alive or not
https://docs.celeryproject.org/en/latest/_modules/celery/app/control.html#Control.ping

celey_app.control.ping()

Thailand answered 1/10, 2019 at 19:17 Comment(0)
S
1

The below script is worked for me.

    #Import the celery app from project
    from application_package import app as celery_app
    def get_celery_worker_status():
        insp = celery_app.control.inspect()
        nodes = insp.stats()
        if not nodes:
            raise Exception("celery is not running.")
        logger.error("celery workers are: {}".format(nodes))
        return nodes
Socle answered 1/10, 2020 at 14:45 Comment(0)
G
1

You can test on your terminal by running the following command.

celery -A proj_name worker -l INFO

You can review every time your celery runs.

Geer answered 28/2, 2022 at 14:24 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.