Django Celery get task count
Asked Answered
A

5

25

I am currently using django with celery and everything works fine.

However I want to be able to give the users an opportunity to cancel a task if the server is overloaded by checking how many tasks are currently scheduled.

How can I achieve this ?

I am using redis as broker.

I just found this : Retrieve list of tasks in a queue in Celery

It is somehow relate to my issue but I don't need to list the tasks , just count them :)

Anhydrite answered 5/9, 2013 at 8:43 Comment(0)
C
28

If your broker is configured as redis://localhost:6379/1, and your tasks are submitted to the general celery queue, then you can get the length by the following means:

import redis
queue_name = "celery"
client = redis.Redis(host="localhost", port=6379, db=1)
length = client.llen(queue_name)

Or, from a shell script (good for monitors and such):

$ redis-cli -n 1 -h localhost -p 6379 llen celery
Carincarina answered 2/10, 2013 at 4:9 Comment(1)
Even though this is a correct solution for the redis broker, please mark @stephen Fuhry's comment as the correct solution as it is broker agnostic.Chicane
C
33

Here is how you can get the number of messages in a queue using celery that is broker-agnostic.

By using connection_or_acquire, you can minimize the number of open connections to your broker by utilizing celery's internal connection pooling.

celery = Celery(app)

with celery.connection_or_acquire() as conn:
    conn.default_channel.queue_declare(
        queue='my-queue', passive=True).message_count

You can also extend Celery to provide this functionality:

from celery import Celery as _Celery


class Celery(_Celery)

    def get_message_count(self, queue):
        '''
        Raises: amqp.exceptions.NotFound: if queue does not exist
        '''
        with self.connection_or_acquire() as conn:
            return conn.default_channel.queue_declare(
                queue=queue, passive=True).message_count


celery = Celery(app)
num_messages = celery.get_message_count('my-queue')
Celery answered 19/10, 2015 at 1:3 Comment(6)
Please provide some kind of explanation too to support your answer.Automatism
@Automatism Added some explanation of the approach - hope that helps!Celery
amqp.exceptions.NotFound: Queue.declare: (404) NOT_FOUND - no queue 'default' in vhost '/' Because my queue is not on '/' host it's on '/apples' host. How do I get to that host?Supergalaxy
setting passive to 'False' also works and circumvents the 404 NOT FOUND issue.Yusem
@Yusem that would end up creating the non-existing Exchange, though, which is most likely not desiredDaberath
In the first example of: with celery.connection_or_acquire() as conn: conn.default_channel.queue_declare( queue='my-queue', passive=True).message_count How would you store the message count as a variable?Jewry
C
28

If your broker is configured as redis://localhost:6379/1, and your tasks are submitted to the general celery queue, then you can get the length by the following means:

import redis
queue_name = "celery"
client = redis.Redis(host="localhost", port=6379, db=1)
length = client.llen(queue_name)

Or, from a shell script (good for monitors and such):

$ redis-cli -n 1 -h localhost -p 6379 llen celery
Carincarina answered 2/10, 2013 at 4:9 Comment(1)
Even though this is a correct solution for the redis broker, please mark @stephen Fuhry's comment as the correct solution as it is broker agnostic.Chicane
B
7

If you have already configured redis in your app, you can try this:

from celery import Celery

QUEUE_NAME = 'celery'

celery = Celery(app)
client = celery.connection().channel().client

length = client.llen(QUEUE_NAME)
Brasilein answered 11/9, 2015 at 7:36 Comment(2)
For redis, client = app.broker_connection().channel().clientJunitajunius
This will create a new hanging Redis connection every time you run this code. You have to release the opened connection and channel.Hurter
H
6

Get a redis client instance used by Celery, then check the queue length. Don't forget to release the connection every time you use it (use .acquire):

# Get a configured instance of celery:
from project.celery import app as celery_app

def get_celery_queue_len(queue_name):
    with celery_app.pool.acquire(block=True) as conn:
        return conn.default_channel.client.llen(queue_name)

Always acquire a connection from the pool, don't create it manually. Otherwise, your redis server will run out of connection slots and this will kill your other clients.

Hurter answered 4/5, 2018 at 21:18 Comment(0)
I
3

I'll expand on the answer of @StephenFuhry around the not-found error, because more or less broker-agnostic way of retrieving queue length is beneficial even if Celery suggests to mess with brokers directly. In Celery 4 (with Redis broker) this error looks like:

ChannelError: Channel.queue_declare: (404) NOT_FOUND - no queue 'NAME' in vhost '/'

Observations:

  1. ChannelError is a kombu exception (if fact, it's amqp's and kombu "re-exports" it).

  2. On Redis broker Celery/Kombu represent queues as Redis lists

  3. Redis collection type keys are removed whenever the collection becomes empty

  4. If we look at what queue_declare does, it has these lines:

    if passive and not self._has_queue(queue, **kwargs):
        raise ChannelError(...)
    
  5. Kombu Redis virtual transport's _has_queue is this:

    def _has_queue(self, queue, **kwargs):
        with self.conn_or_acquire() as client:
            with client.pipeline() as pipe:
                for pri in self.priority_steps:
                    pipe = pipe.exists(self._q_for_pri(queue, pri))
                return any(pipe.execute())
    

The conclusion is that on a Redis broker ChannelError raised from queue_declare is okay (for an existing queue of course), and just means that the queue is empty.

Here's an example of how to output all active Celery queues' lengths (normally should be 0, unless your worker can't cope with the tasks).

from kombu.exceptions import ChannelError

def get_queue_length(name):
    with celery_app.connection_or_acquire() as conn: 
        try:
            ok_nt = conn.default_channel.queue_declare(queue=name, passive=True)
        except ChannelError:
            return 0
        else:
            return ok_nt.message_count
        
for queue_info in celery_app.control.inspect().active_queues().values():
    print(queue_info[0]['name'], get_queue_length(queue_info[0]['name']))
Impregnate answered 17/8, 2020 at 18:18 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.