How to test the tasks of a Celery instance using pytest? I am not talking about testing the Celery tasks created using the @shared_task
decorator with pytest. For that, there is already a good solution. I am talking about testing tasks created with the app.task
decorator where app
is a Celery app, a Celery
instance.
Below code works for me. celery_worker.reload()
re-registries the task.
def test_xxx(celery_app, celery_worker):
@celery_app.task
def mul(x, y):
return x * y
celery_worker.reload()
assert mul.delay(4, 4).get(timeout=10) == 16
Task.bind(...)
method. –
Jacobian I could not locate any official docs on this. Below is a summary from some of my experiments.
My versions
celery[pytest]==5.3.6
pytest==7.4.3
The basics: https://docs.celeryq.dev/en/stable/userguide/testing.html
The celery_worker
could be configured like below.
@pytest.fixture(scope='session')
def celery_worker_parameters():
return {"without_heartbeat": True}
Look at https://github.com/celery/celery/blob/da1146ab60065847b9742bb61190d52a7a2c5fdf/celery/contrib/testing/worker.py#L155 for celery_worker_parameters
hints.
How does this work? celery_worker
includes celery_worker_parameters
fixture in its signature. See https://github.com/celery/celery/blob/da1146ab60065847b9742bb61190d52a7a2c5fdf/celery/contrib/pytest.py#L199 and https://github.com/celery/celery/blob/da1146ab60065847b9742bb61190d52a7a2c5fdf/celery/contrib/pytest.py#L160
You could try the below approach to configuring the celery_app
and see if it works for you:
@pytest.mark.celery(
broker_url="memory://localhost:8000/"
)
def test_how_to_use_celery_app_and_worker(
celery_app,
celery_worker,
)
Celery loads the mark and configures itself. See here: https://github.com/celery/celery/blob/da1146ab60065847b9742bb61190d52a7a2c5fdf/celery/contrib/pytest.py#L72
My test env has some constraints. I used a fixture in conftest.py
for celery_app
configuration.
@pytest.fixture
def os_environ_celery_app_cfg():
with mock.patch.dict(os.environ, {
"CELERY_BROKER_URL": "memory://localhost:8000/",
}) as mock_environ:
yield mock_environ
See https://github.com/celery/celery/blob/da1146ab60065847b9742bb61190d52a7a2c5fdf/celery/app/utils.py#L103 for hints about these env vars.
My unit test:
def test_how_to_use_celery_app_and_worker(
os_environ_celery_app_cfg,
celery_app,
celery_worker,
)
@celery_app.task(bind=True) # ** ensure @celery_app prefix **
def my_worker(self: Task, *args, **kwargs):
pass
celery_worker.reload() # ** call after task declaration **
my_worker.apply_async()
It is important that celery_worker.reload()
is called after the async task is declared. This is because the Celery Strategy for this task needs to be reconfigured. See https://github.com/celery/celery/blob/da1146ab60065847b9742bb61190d52a7a2c5fdf/celery/worker/worker.py#L273
Also, ensure that the task decorator has the @celery_app
prefix:
@celery_app.task
The async task in the unit test will run in the worker thread. Any effects / benefits introduced by unit test fixtures will not be available in the worker thread.
If you have a Celery shared_task implemented elsewhere, you could still use above approach in your unit test with some tweaks. You may not have to call reload()
depending on how your code is structured. If celery_worker
is invoked, it will automatically invoke celery_app
. You cannot avoid it. See https://github.com/celery/celery/blob/da1146ab60065847b9742bb61190d52a7a2c5fdf/celery/contrib/pytest.py#L196
As shown below:
.
├── __init__.py
├── conftest.py
├── main.py
└── test_celery_app.py
main.py
from celery import Celery
mul_celery = Celery(
"mul_celery",
broker="amqp://TFRobot:*****@localhost:31672/test",
backend="redis://127.0.0.1:30379/1",
)
@mul_celery.task
def mul(x, y):
return x * y
conftest.py
import pytest
pytest_plugins = ("celery.contrib.pytest",) # <-- Important!
@pytest.fixture(scope="session")
def celery_config():
return {"broker_url": "amqp://TFRobot:TFRobotServer@localhost:31672/test"}
test_celery_app.py
from tests.package_tests.celery_test.main import mul, mul_celery
from celery.contrib.testing.worker import start_worker
def test_add_task(celery_app, celery_worker):
@celery_app.task
def add(x, y):
return x + y
celery_worker.reload()
assert add.delay(4, 4).get(timeout=2) == 8
def test_mul_task_without_fixture():
with start_worker( # <-- Important!
mul_celery, pool="solo", loglevel="info", perform_ping_check=False, shutdown_timeout=30 # <-- Important!
) as worker:
assert mul.delay(4, 4).get(timeout=2) == 16
Notes:
- Ping Check Disabled: The
celery.contrib.testing.tasks.ping
task doesn't seem to register automatically, which is why the ping check is disabled. But I don't konw why now. - Extended Shutdown Timeout: The
shutdown_timeout
is set to 30 seconds (default is 10 seconds) for smoother shutdowns. - Increased Logging Level: The log level is set to
"info"
to provide more detailed output in case of errors. - Pytest Plugins Configuration: It's important to configure pytest_plugins in conftest.py. If not configured, pytest-celery will be used by default, which is a more complex Docker-based testing solution intended for production-level testing. This setup is less suitable for unit and integration tests, as it's more complex and slower to execute.
© 2022 - 2024 — McMap. All rights reserved.