Resolving circular imports in celery and django
Asked Answered
R

5

26

I have a Django app that uses Celery to offload some tasks. Mainly, it defers the computation of some fields in a database table.

So, I have a tasks.py:

from models import MyModel
from celery import shared_task

@shared_task
def my_task(id):
    qs = MyModel.objects.filter(some_field=id)
    for record in qs:
        my_value = #do some computations
        record.my_field = my_value
        record.save()

And in models.py

 from django.db import models
 from tasks import my_task

 class MyModel(models.Model):
      field1 = models.IntegerField()
      #more fields
      my_field = models.FloatField(null=True)

      @staticmethod
      def load_from_file(file):
          #parse file, set fields from file
          my_task.delay(id)

Now obviously, this won't work because of a circular import (models imports tasks and tasks imports models).

I've resolved this for the moment by calling my_task.delay() from views.py, but it seems to make sense to keep the model logic within the model class. Is there a better way of doing this?

Riancho answered 15/10, 2014 at 9:28 Comment(1)
Create a custom ModelManager and put in a separate file.Tailpipe
N
1

Use signals.

tasks.py

from models import MyModel, my_signal
from celery import shared_task
from django.dispatch import receiver

@shared_task
def my_task(id):
    qs = MyModel.objects.filter(some_field=id)
    for record in qs:
        my_value = #do some computations
        record.my_field = my_value
        record.save()

@receiver(my_signal)
def my_receiver(sender, **kwargs):
    my_task.delay(kwargs['id'])

models.py

 from django.db import models
 from tasks import my_task
 from django.dispatch import Signal

 my_signal = Signal(providing_args=['id'])

 class MyModel(models.Model):
      field1 = models.IntegerField()
      #more fields
      my_field = models.FloatField(null=True)

      @staticmethod
      def load_from_file(file):
          #parse file, set fields from file
          my_signal.send(sender=?, id=?)
Nostoc answered 16/10, 2014 at 18:31 Comment(2)
Using signals is IMHO the bad solution to solve circular import issue. In the end, your code is hard to understand. Signals are made when you actually need to broadcast something.Jagged
I agree the signals make for confusing code. The answer from Piotr Ćwiek using send_task() should be accepted answer IMO.Departure
M
23

The solution posted by joshua is very good, but when I first tried it, I found that my @receiver decorators had no effect. That was because the tasks module wasn't imported anywhere, which was expected as I used task auto-discovery.

There is, however, another way to decouple tasks.py from modules.py. Namely, tasks can be sent by name and they don't have to be evaluated (imported) in the process that sends them:

from django.db import models
#from tasks import my_task
import celery

class MyModel(models.Model):
    field1 = models.IntegerField()
    #more fields
    my_field = models.FloatField(null=True)

    @staticmethod
    def load_from_file(file):
        #parse file, set fields from file
        #my_task.delay(id)
        celery.current_app.send_task('myapp.tasks.my_task', (id,))

send_task() is a method on Celery app objects.

In this solution it is important to take care of correct, predictable names for your tasks.

Manxman answered 22/5, 2015 at 13:1 Comment(0)
S
20

In your models instead of importing the my_task at the beginning of the file, you can import it just before you use it. It will solve circular imports problem.

from django.db import models

class MyModel(models.Model):
      field1 = models.IntegerField()
      #more fields
      my_field = models.FloatField(null=True)

      @staticmethod
      def load_from_file(file):
          #parse file, set fields from file
          from tasks import my_task   # import here instead of top
          my_task.delay(id)

Alternatively, you can also do same thing in your tasks.py. You can import your models just before you use it instead of beginning.

Alternative:

You can use send_task method to call your task

from celery import current_app
from django.db import models

class MyModel(models.Model):
      field1 = models.IntegerField()
      #more fields
      my_field = models.FloatField(null=True)

      @staticmethod
      def load_from_file(file):
          #parse file, set fields from file
          current_app.send_task('myapp.tasks.my_task', (id,))
Steeple answered 15/10, 2014 at 12:37 Comment(3)
I disagree that this is a code smell, imo it's a necessity.Codfish
I think calling send_task is way better than using signals in that caseJagged
I prefer @umbrae's answer, it uses a similar app registry mechanism on the Django side instead so you can keep triggering tasks as you normally do with delay().Wanda
C
15

Just to toss one more not-great solution into this list, what I've ended up doing is relying on django's now-built-in app registry.

So in tasks.py, rather than importing from models, you use apps.get_model() to gain access to the model.

I do this with a helper method with a healthy bit of documentation just to express why this is painful:

from django.apps import apps

def _model(model_name):
    """Generically retrieve a model object.

    This is a hack around Django/Celery's inherent circular import
    issues with tasks.py/models.py. In order to keep clean abstractions, we use
    this to avoid importing from models, introducing a circular import.

    No solutions for this are good so far (unnecessary signals, inline imports,
    serializing the whole object, tasks forced to be in model, this), so we
    use this because at least the annoyance is constrained to tasks.
    """
    return apps.get_model('my_app', model_name)

And then:

@shared_task
def some_task(post_id):
    post = _model('Post').objects.get(pk=post_id)

You could certainly just use apps.get_model() directly though.

Cyclostyle answered 20/4, 2017 at 21:42 Comment(2)
I like this solution. If I'm not wrong, the AppConfig feature of django was added purposely for this type of cases in which you can't (for some reason or another) load some django models yet.Kasey
I think this is the best way for do this! Thx so much!Advertise
N
1

Use signals.

tasks.py

from models import MyModel, my_signal
from celery import shared_task
from django.dispatch import receiver

@shared_task
def my_task(id):
    qs = MyModel.objects.filter(some_field=id)
    for record in qs:
        my_value = #do some computations
        record.my_field = my_value
        record.save()

@receiver(my_signal)
def my_receiver(sender, **kwargs):
    my_task.delay(kwargs['id'])

models.py

 from django.db import models
 from tasks import my_task
 from django.dispatch import Signal

 my_signal = Signal(providing_args=['id'])

 class MyModel(models.Model):
      field1 = models.IntegerField()
      #more fields
      my_field = models.FloatField(null=True)

      @staticmethod
      def load_from_file(file):
          #parse file, set fields from file
          my_signal.send(sender=?, id=?)
Nostoc answered 16/10, 2014 at 18:31 Comment(2)
Using signals is IMHO the bad solution to solve circular import issue. In the end, your code is hard to understand. Signals are made when you actually need to broadcast something.Jagged
I agree the signals make for confusing code. The answer from Piotr Ćwiek using send_task() should be accepted answer IMO.Departure
L
0

Not sure if this is anyone else's problem, but I took a few hours, and I found a solution...mainly, the key from the documentation:

Using the @shared_task decorator

The tasks you write will probably live in reusable apps, and reusable apps cannot depend on the project itself, so you also cannot import your app instance directly.

Basically what I was doing was this...

####
# project/coolapp/tasks.py -- DON'T DO THIS
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")
app = Celery("coolapp")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()


@app.task(bind=True)
def some_task(self, some_id):
  from coolapp.models import CoolPerson

####
# project/coolapp/__init__.py -- DON'T DO THIS
from __future__ import absolute_import, unicode_literals
from .tasks import app as celery_app
__all__ = ("celery_app",)

Therefore, I was getting weird errors about missing app labels (a clear indication of a circular import).

The solution...

Refactor project/coolapp/tasks.py -> project/project/tasks.py and project/coolapp/__init__.py -> project/project/__init__.py.

IMPORTANT: This does not (and should not) be added to INSTALLED_APPS. Otherwise, you'll get the circular import.

So then to start the woker:

celery -A project.project worker -l INFO

Also, a little debugging tip...

When you want to find out if your tasks are properly discovered, put this in project/project/app.py:

app.autodiscover_tasks()
assert "project.app.tasks.some_task" in app.tasks

Otherwise, you'll have to start up the worker only to realize your tasks aren't included in the app, then you'll have to wait for shutdown.

Lymphoblast answered 30/1, 2022 at 15:34 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.