I'll expand on the answer of @StephenFuhry around the not-found error, because more or less broker-agnostic way of retrieving queue length is beneficial even if Celery suggests to mess with brokers directly. In Celery 4 (with Redis broker) this error looks like:
ChannelError: Channel.queue_declare: (404) NOT_FOUND - no queue 'NAME' in vhost '/'
Observations:
ChannelError
is a kombu
exception (if fact, it's amqp
's and kombu
"re-exports" it).
On Redis broker Celery/Kombu represent queues as Redis lists
Redis collection type keys are removed whenever the collection becomes empty
If we look at what queue_declare
does, it has these lines:
if passive and not self._has_queue(queue, **kwargs):
raise ChannelError(...)
Kombu Redis virtual transport's _has_queue
is this:
def _has_queue(self, queue, **kwargs):
with self.conn_or_acquire() as client:
with client.pipeline() as pipe:
for pri in self.priority_steps:
pipe = pipe.exists(self._q_for_pri(queue, pri))
return any(pipe.execute())
The conclusion is that on a Redis broker ChannelError
raised from queue_declare
is okay (for an existing queue of course), and just means that the queue is empty.
Here's an example of how to output all active Celery queues' lengths (normally should be 0, unless your worker can't cope with the tasks).
from kombu.exceptions import ChannelError
def get_queue_length(name):
with celery_app.connection_or_acquire() as conn:
try:
ok_nt = conn.default_channel.queue_declare(queue=name, passive=True)
except ChannelError:
return 0
else:
return ok_nt.message_count
for queue_info in celery_app.control.inspect().active_queues().values():
print(queue_info[0]['name'], get_queue_length(queue_info[0]['name']))