Connect new celery periodic task in django
Asked Answered
B

6

27

It's not a question but help to those who will find that the declaration of periodic tasks described in celery 4.0.1 documentation is hard to integrate in django: http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#entries

copy paste celery config file main_app/celery.py:

from celery import Celery
from celery.schedules import crontab

app = Celery()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

    # Calls test('world') every 30 seconds
    sender.add_periodic_task(30.0, test.s('world'), expires=10)

    # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )

@app.task
def test(arg):
    print(arg)

Question:

But what if we use django and our tasks are placed in another app? With celery 4.0.1 we no longer have @periodic_task decorator. So let's see what we can do.

First case:

If you prefer to keep tasks and their schedule close to each other:

main_app/some_app/tasks.py:

from main_app.celery import app as celery_app

@celery_app.on_after_configure.connect
    def setup_periodic_tasks(sender, **kwargs):
        # Calls test('hello') every 10 seconds.
        sender.add_periodic_task(10.0, test.s('hello'))

@celery_app.task
def test(arg):
    print(arg)

We can run beat in debug mode:

celery -A main_app beat -l debug

And we will see that there's no such periodic task.

Second case:

We can try to describe all periodic tasks in config file like this:

main_app/celery.py:

...
app = Celery()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    from main_app.some_app.tasks import test
    sender.add_periodic_task(10.0, test.s('hello'))
...

The result is the same. But it will behave differently that you can see with manual debugging via pdb. In first example setup_periodic_tasks callback will not be fired at all. But in second example we'll get django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet. (This exception will not be print out)

Balladry answered 13/12, 2016 at 10:40 Comment(2)
SO absolutely welcomes sharing information in the Question and Answer format. However, what you've got here is not a well written question. Please rewrite this so that it reads as an actual question written by someone facing an actual problem. You already know the solution but write the question from the perspective of someone who does not already know. (In the case at hand here, seems to me that you could produce a question from the p.o.v. of someone migration from 3.x to 4.x and finding that what used to work no longer works.)Kelso
Also everything from the "Question" header down is a solution, and should be in a formal answer. (You can post significantly different solutions as different answers. People can vote on them independently then.)Kelso
B
42

For django we need to use another signal: @celery_app.on_after_finalize.connect. It can be used for both:

  • declaration of task schedule close to task in app/tasks.py because this signal will be fired after all tasks.py imported and all possible receivers already subscribed (first case).
  • centralized schedule declaration because django apps will be already initialized and ready for imports (second case)

I think I should write down final declaration:

First case

Declaration of task schedule close to task:

main_app/some_app/tasks.py

from main_app.celery import app as celery_app

@celery_app.on_after_finalize.connect
    def setup_periodic_tasks(sender, **kwargs):
        # Calls test('hello') every 10 seconds.
        sender.add_periodic_task(10.0, test.s('hello'))

@celery_app.task
def test(arg):
    print(arg)

Second case

Centralized schedule declaration in config file main_app/celery.py:

...

app = Celery()

@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    from main_app.some_app.tasks import test
    sender.add_periodic_task(10.0, test.s('hello'))
...
Balladry answered 13/12, 2016 at 10:40 Comment(3)
this is still not working with @shared_task as people claim... github.com/celery/celery/issues/3797Lanford
@HemanthSP, check docs.celeryproject.org/en/latest/userguide/… For this example you can use celery -A main_app beat -l debug to run scheduler and to run a worker celery -A main_app worker -l debugBalladry
My issue was using app = Celery() in other_app/tasks.py. Using from main_app.celery import app as celery_app solved the problem!Byline
C
21

If the intent is to maintain task logic separately in tasks.py, then calling from main_app.some_app.tasks import test inside setup_periodic_tasks did not work for me. What worked is the following:

celery.py

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

@app.task
def test(arg):
    print(arg)
    from some_app.tasks import test
    test(arg)

tasks.py

@shared_task
def test(arg):
    print('world')

This resulted in the following output:

[2017-10-26 22:52:42,262: INFO/MainProcess] celery@ubuntu-xenial ready.
[2017-10-26 22:52:42,263: INFO/MainProcess] Received task: main_app.celery.test[3cbdf4fa-ff63-401a-a9e4-cfd1b6bb4ad4]  
[2017-10-26 22:52:42,367: WARNING/ForkPoolWorker-2] hello
[2017-10-26 22:52:42,368: WARNING/ForkPoolWorker-2] world
[2017-10-26 22:52:42,369: INFO/ForkPoolWorker-2] Task main_app.celery.test[3cbdf4fa-ff63-401a-a9e4-cfd1b6bb4ad4] succeeded in 0.002823335991706699s: None
[2017-10-26 22:52:51,205: INFO/Beat] Scheduler: Sending due task add every 10 (main_app.celery.test)
[2017-10-26 22:52:51,207: INFO/MainProcess] Received task: main_app.celery.test[ce0f3cfc-54d5-4d74-94eb-7ced2e5a6c4b]  
[2017-10-26 22:52:51,209: WARNING/ForkPoolWorker-2] hello
[2017-10-26 22:52:51,209: WARNING/ForkPoolWorker-2] world
Cathrinecathryn answered 26/10, 2017 at 23:5 Comment(1)
this is the only thing that worked for me. thank you.Donal
G
1

If you want to use task logic seperately, use this setup:

celery.py:

import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'backend.settings') # your settings.py path

app = Celery()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(5, periodic_task.s('sms'), name='SMS Process')
    sender.add_periodic_task(60, periodic_task.s('email'), name='Email Process')


@app.task
def periodic_task(taskname):
    from myapp.tasks import sms_process, email_process

    if taskname == 'sms':
        sms_process()

    elif taskname == 'email':
        email_process()

a sample task in a django app named myapp:

myapp/tasks.py:

def sms_process():
    print('send sms task')

def email_process():
    print('send email task')
Grainger answered 28/11, 2019 at 6:38 Comment(0)
C
1

For the new way, you still need to call tasks periodically in settings.py same as the old way.

For example, app with Celery() is defined in core/celery.py as shown below:

# "core/celery.py"

import os

from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'core.settings')

app = Celery('core')

app.config_from_object('django.conf:settings', namespace='CELERY')

app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

Then, there is display task below:

# "my_app/tasks.py"

from celery import shared_task

@shared_task
def display(arg):
    return arg

Now, you need to call display task periodically in core/settings.py for the new way as shown below, then it works properly. *You need to import app and display task as shown below:

# "core/settings.py"

from .celery import app
from my_app.tasks import display

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(
        3.0, 
        display.s('Hello'), 
        name='display-every-3-seconds'
    )
    sender.add_periodic_task(
        7.0, 
        display.s('World'), 
        name='display-every-7-seconds'
    )

In addition, this below is the old way which also works properly:

# "core/settings.py"

CELERY_BEAT_SCHEDULE = {
    "display-every-3-seconds": {
        "task": "my_app.tasks.display",
        "schedule": 3.0,
        "args": ["Hello"],
    },
    "display-every-7-seconds": {
        "task": "my_app.tasks.display",
        "schedule": 7.0,
        "args": ["World"],
    },
}
Cantara answered 10/5, 2023 at 5:34 Comment(0)
P
0

I got it working with using

celery.py

import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings')

app = Celery('mysite')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

tasks.py

from celery import current_app
app = current_app._get_current_object()

@app.task
def test(arg):
    print(arg)

@app.on_after_finalize.connect
def app_ready(**kwargs):
    """
    Called once after app has been finalized.
    """
    sender = kwargs.get('sender')

    # periodic tasks
    speed = 5
    sender.add_periodic_task(speed, test.s('foo'),name='update leases every {} seconds'.format(speed))

running worker as

celery -A mysite worker --beat --scheduler django --loglevel=info
Punnet answered 23/11, 2019 at 0:27 Comment(0)
D
0

Was struggling as well, no activity in terminal, got it working with below:

Django version 3.2.8, Celery version 5.2.0

In Django project, called Proj

Proj/Proj celery.py (a file next to settings.py)

celery.py

import os

from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'Proj.settings')

app = Celery('Proj')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django apps.
app.autodiscover_tasks()

within the __init__.py (same folder as settings.py)

__init__.py

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

Within any sub django app folder, a file called tasks.py (next to models.py)

tasks.py

from Proj.celery import app

# Schedule
@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 1 seconds.
    sender.add_periodic_task(1.0, test.s('hello'), name='add every 1')

    # Calls test('world') every 3 seconds
    sender.add_periodic_task(3.0, test.s('world'), expires=10)

# Tasks
@app.task
def test(arg):
    print(arg)

Then, run below in terminal, using virtual environment, if applicable:

>>> celery -A Proj worker -B

RESULT (confirms it's working):

[2021-11-10 11:22:22,070: WARNING/MainProcess] /.venv/lib/python3.9/site-packages/celery/fixups/django.py:203: UserWarning: Using settings.DEBUG leads to a memory
            leak, never use this setting in production environments!
  warnings.warn('''Using settings.DEBUG leads to a memory

[2021-11-10 11:22:22,173: WARNING/ForkPoolWorker-9] hello
[2021-11-10 11:22:22,173: WARNING/ForkPoolWorker-3] hello
[2021-11-10 11:22:22,173: WARNING/ForkPoolWorker-2] world
Duley answered 10/11, 2021 at 11:49 Comment(1)
this gives me [2022-07-22 14:51:52,843: ERROR/Beat] beat: Connection error: [Errno 61] Connection refused. Trying again in 4.0 seconds... [2022-07-22 14:51:53,061: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 61] Connection refused. Trying again in 4.00 seconds... (2/100)Horizon

© 2022 - 2024 — McMap. All rights reserved.