How to test the tasks of a Celery instance using pytest?
Asked Answered
S

3

0

How to test the tasks of a Celery instance using pytest? I am not talking about testing the Celery tasks created using the @shared_task decorator with pytest. For that, there is already a good solution. I am talking about testing tasks created with the app.task decorator where app is a Celery app, a Celery instance.

Seiden answered 24/1, 2023 at 4:43 Comment(0)
J
1

Below code works for me. celery_worker.reload() re-registries the task.

def test_xxx(celery_app, celery_worker):
    @celery_app.task
    def mul(x, y):
        return x * y

    celery_worker.reload()
    assert mul.delay(4, 4).get(timeout=10) == 16
Jacobian answered 13/3, 2023 at 23:11 Comment(2)
This example works, but it doesn't do any testing of my own code.Byelorussian
oh. you probably need to look into the Task.bind(...) method.Jacobian
R
0

I could not locate any official docs on this. Below is a summary from some of my experiments.

My versions

celery[pytest]==5.3.6
pytest==7.4.3

The basics: https://docs.celeryq.dev/en/stable/userguide/testing.html

The celery_worker could be configured like below.

@pytest.fixture(scope='session')
def celery_worker_parameters():
    return {"without_heartbeat": True}

Look at https://github.com/celery/celery/blob/da1146ab60065847b9742bb61190d52a7a2c5fdf/celery/contrib/testing/worker.py#L155 for celery_worker_parameters hints.

How does this work? celery_worker includes celery_worker_parameters fixture in its signature. See https://github.com/celery/celery/blob/da1146ab60065847b9742bb61190d52a7a2c5fdf/celery/contrib/pytest.py#L199 and https://github.com/celery/celery/blob/da1146ab60065847b9742bb61190d52a7a2c5fdf/celery/contrib/pytest.py#L160

You could try the below approach to configuring the celery_app and see if it works for you:

@pytest.mark.celery(
  broker_url="memory://localhost:8000/"
)
def test_how_to_use_celery_app_and_worker(
  celery_app,
  celery_worker,
)

Celery loads the mark and configures itself. See here: https://github.com/celery/celery/blob/da1146ab60065847b9742bb61190d52a7a2c5fdf/celery/contrib/pytest.py#L72

My test env has some constraints. I used a fixture in conftest.py for celery_app configuration.

@pytest.fixture
def os_environ_celery_app_cfg():
    with mock.patch.dict(os.environ, {
        "CELERY_BROKER_URL": "memory://localhost:8000/",
    }) as mock_environ:
        yield mock_environ

See https://github.com/celery/celery/blob/da1146ab60065847b9742bb61190d52a7a2c5fdf/celery/app/utils.py#L103 for hints about these env vars.

My unit test:

def test_how_to_use_celery_app_and_worker(
  os_environ_celery_app_cfg,
  celery_app,
  celery_worker,
)

  @celery_app.task(bind=True) # ** ensure @celery_app prefix **
  def my_worker(self: Task, *args, **kwargs):
    pass

  celery_worker.reload() # ** call after task declaration **

  my_worker.apply_async()

It is important that celery_worker.reload() is called after the async task is declared. This is because the Celery Strategy for this task needs to be reconfigured. See https://github.com/celery/celery/blob/da1146ab60065847b9742bb61190d52a7a2c5fdf/celery/worker/worker.py#L273

Also, ensure that the task decorator has the @celery_app prefix:

@celery_app.task

The async task in the unit test will run in the worker thread. Any effects / benefits introduced by unit test fixtures will not be available in the worker thread.

If you have a Celery shared_task implemented elsewhere, you could still use above approach in your unit test with some tweaks. You may not have to call reload() depending on how your code is structured. If celery_worker is invoked, it will automatically invoke celery_app. You cannot avoid it. See https://github.com/celery/celery/blob/da1146ab60065847b9742bb61190d52a7a2c5fdf/celery/contrib/pytest.py#L196

Reenareenforce answered 30/1, 2024 at 3:45 Comment(0)
V
0

As shown below:

.
├── __init__.py
├── conftest.py
├── main.py
└── test_celery_app.py

main.py

from celery import Celery

mul_celery = Celery(
    "mul_celery",
    broker="amqp://TFRobot:*****@localhost:31672/test",
    backend="redis://127.0.0.1:30379/1",
)

@mul_celery.task
def mul(x, y):
    return x * y

conftest.py

import pytest

pytest_plugins = ("celery.contrib.pytest",)  # <-- Important!

@pytest.fixture(scope="session")
def celery_config():
    return {"broker_url": "amqp://TFRobot:TFRobotServer@localhost:31672/test"}

test_celery_app.py

from tests.package_tests.celery_test.main import mul, mul_celery
from celery.contrib.testing.worker import start_worker

def test_add_task(celery_app, celery_worker):
    @celery_app.task
    def add(x, y):
        return x + y

    celery_worker.reload()
    assert add.delay(4, 4).get(timeout=2) == 8

def test_mul_task_without_fixture():
    with start_worker(  # <-- Important!
        mul_celery, pool="solo", loglevel="info", perform_ping_check=False, shutdown_timeout=30  # <-- Important!
    ) as worker:
        assert mul.delay(4, 4).get(timeout=2) == 16

Notes:

  1. Ping Check Disabled: The celery.contrib.testing.tasks.ping task doesn't seem to register automatically, which is why the ping check is disabled. But I don't konw why now.
  2. Extended Shutdown Timeout: The shutdown_timeout is set to 30 seconds (default is 10 seconds) for smoother shutdowns.
  3. Increased Logging Level: The log level is set to "info" to provide more detailed output in case of errors.
  4. Pytest Plugins Configuration: It's important to configure pytest_plugins in conftest.py. If not configured, pytest-celery will be used by default, which is a more complex Docker-based testing solution intended for production-level testing. This setup is less suitable for unit and integration tests, as it's more complex and slower to execute.
Vanish answered 11/8, 2024 at 5:6 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.