Celery: how to get queue size in a reliable and testable way
Asked Answered
G

4

9

I'm losing my mind trying to find a reliable and testable way to get the number of tasks contained in a given Celery queue.

I've already read these two related discussions:

But I have not been able to solve my issue using the methods described in those threads.

I'm using Redis as backend, but I would like to have a backend independent and flexible solution, especially for tests.

This is my current situation: I've defined an EnhancedCelery class which inherits from Celery and adds a couple of methods, specifically get_queue_size() is the one I'm trying to properly implement/test.

The following is the code in my test case:

celery_test_app = EnhancedCelery(__name__)

# this is needed to avoid exception for ping command
# which is automatically triggered by the worker once started
celery_test_app.loader.import_module('celery.contrib.testing.tasks')

# in memory backend
celery_test_app.conf.broker_url = 'memory://'
celery_test_app.conf.result_backend = 'cache+memory://'

# We have to setup queues manually, 
# since it seems that auto queue creation doesn't work in tests :(
celery_test_app.conf.task_create_missing_queues = False
celery_test_app.conf.task_default_queue = 'default'
celery_test_app.conf.task_queues = (
    Queue('default', routing_key='task.#'),
    Queue('queue_1', routing_key='q1'),
    Queue('queue_2', routing_key='q2'),
    Queue('queue_3', routing_key='q3'),
)
celery_test_app.conf.task_default_exchange = 'tasks'
celery_test_app.conf.task_default_exchange_type = 'topic'
celery_test_app.conf.task_default_routing_key = 'task.default'
celery_test_app.conf.task_routes = {
    'sample_task': {
        'queue': 'default',
        'routing_key': 'task.default',
    },
    'sample_task_in_queue_1': {
        'queue': 'queue_1',
        'routing_key': 'q1',
    },
    'sample_task_in_queue_2': {
        'queue': 'queue_2',
        'routing_key': 'q2',
    },
    'sample_task_in_queue_3': {
        'queue': 'queue_3',
        'routing_key': 'q3',
    },
}


@celery_test_app.task()
def sample_task():
    return 'sample_task_result'


@celery_test_app.task(queue='queue_1')
def sample_task_in_queue_1():
    return 'sample_task_in_queue_1_result'


@celery_test_app.task(queue='queue_2')
def sample_task_in_queue_2():
    return 'sample_task_in_queue_2_result'


@celery_test_app.task(queue='queue_3')
def sample_task_in_queue_3():
    return 'sample_task_in_queue_3_result'


class EnhancedCeleryTest(TestCase):
    def test_get_queue_size_returns_expected_value(self):
        def add_task(task):
            task.apply_async()

        with start_worker(celery_test_app):
            for _ in range(7):
                add_task(sample_task_in_queue_1)

            for _ in range(4):
                add_task(sample_task_in_queue_2)

            for _ in range(2):
                add_task(sample_task_in_queue_3)

            self.assertEqual(celery_test_app.get_queue_size('queue_1'), 7)
            self.assertEqual(celery_test_app.get_queue_size('queue_2'), 4)
            self.assertEqual(celery_test_app.get_queue_size('queue_3'), 2)

Here are my attempts to implement get_queue_size():

  1. This always returns zero (jobs == 0):

    def get_queue_size(self, queue_name: str) -> Optional[int]:
        with self.connection_or_acquire() as connection:
            channel = connection.default_channel
    
            try:
                name, jobs, consumers = channel.queue_declare(queue=queue_name, passive=True)
                return jobs
            except (ChannelError, NotFound):
                pass
    
  2. This also always returns zero:

    def get_queue_size(self, queue_name: str) -> Optional[int]:
        inspection = self.control.inspect()
    
        return inspection.active() # zero!
    
        # or:
        return inspection.scheduled() # zero!
    
        # or:
        return inspection.reserved() # zero!
    
  3. This works by returning the expected number for each queue, but only in the test environment, because the channel.queues property does not exist when using the redis backend:

    def get_queue_size(self, queue_name: str) -> Optional[int]:
        with self.connection_or_acquire() as connection:
            channel = connection.default_channel
    
            if hasattr(channel, 'queues'):
                queue = channel.queues.get(queue_name)
    
                if queue is not None:
                    return queue.unfinished_tasks
    
Gunshot answered 29/4, 2020 at 13:16 Comment(0)
D
6

None of the solutions you mentioned are entirely correct in my humble opinion. As you already mentioned this is backend-specific so you would have to wrap handlers for all backends supported by Celery to provide backend-agnostic queue inspection. In the Redis case you have to directly connect to Redis and LLEN the queue you want to inspect. In the case of RabbitMQ you find this information in completely different way. Same story with SQS...

This has all been discussed in the Retrieve list of tasks in a queue in Celery thread...

Finally, there is a reason why Celery does not provide this functionality out of box - the information is, I believe, useless. By the time you get what is in the queue it may already be empty!

If you want to monitor what is going on with your queues I suggest another approach. - Write your own real-time monitor. The example just captures task-failed events, but you should be able to modify it easily to capture all events you care about, and gather data about those tasks (queue, time, host it was executed on, etc). Clearly is an example how it is done in a more serious project.

Decrypt answered 29/4, 2020 at 13:27 Comment(0)
E
2

You can see how it's implemented in the Flower (real-time monitor for Celery) here They have different Broker class implementation for redis and rabbitmq.

Another way - use celery's task events: calculate how many tasks were sent and how many were succeed/failed

Eclectic answered 29/4, 2020 at 13:38 Comment(1)
That is exactly what I had in mind when I mentioned the real-time monitoring.Decrypt
A
1

This is from the Flower codebase to get the broker queue length. It's fast as broker access.

app = Celery("tasks")

from flower.utils.broker import Broker
broker = Broker(
    app.connection(connect_timeout=1.0).as_uri(include_password=True),
    broker_options=app.conf.broker_transport_options,
    broker_use_ssl=app.conf.broker_use_ssl,
)

async def queue_length():
    queues = await broker.queues(["celery"])
    return queues[0].get("messages")
Awesome answered 25/9, 2023 at 22:3 Comment(0)
K
1

For Redis, when no priority queues or named queues are used, one may proceed as follows. (No flower dependency)

from celery import app
from redis import Redis
from django.conf import settings

redis_client = Redis.from_url(settings.CELERY_BROKER_URL, socket_connect_timeout=3)
print(redis_client.llen(app.default_app.conf.task_default_queue))
Ketty answered 29/9, 2023 at 8:46 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.