Flask, blueprints uses celery task and got cycle import
Asked Answered
O

4

12

I have an application with Blueprints and Celery the code is here:

config.py

import os
from celery.schedules import crontab
basedir = os.path.abspath(os.path.dirname(__file__))

class Config:
    SECRET_KEY = os.environ.get('SECRET_KEY') or ''
    SQLALCHEMY_COMMIT_ON_TEARDOWN = True
    RECORDS_PER_PAGE = 40
    SQLALCHEMY_DATABASE_URI = ''
    CELERY_BROKER_URL = ''
    CELERY_RESULT_BACKEND = ''
    CELERY_RESULT_DBURI = ''
    CELERY_TIMEZONE = 'Europe/Kiev'
    CELERY_ENABLE_UTC = False
    CELERYBEAT_SCHEDULE = {}

    @staticmethod
    def init_app(app):
        pass


class DevelopmentConfig(Config):
    DEBUG = True
    WTF_CSRF_ENABLED = True
    APP_HOME = ''
    SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...'
    CELERY_BROKER_URL = 'sqla+mysql://...'
    CELERY_RESULT_BACKEND = "database"
    CELERY_RESULT_DBURI = 'mysql://...'
    CELERY_TIMEZONE = 'Europe/Kiev'
    CELERY_ENABLE_UTC = False
    CELERYBEAT_SCHEDULE = {
        'send-email-every-morning': {
            'task': 'app.workers.tasks.send_email_task',
            'schedule': crontab(hour=6, minute=15),
        },
    }


class TestConfig(Config):
    DEBUG = True
    WTF_CSRF_ENABLED = False
    TESTING = True
    SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...'


class ProdConfig(Config):
    DEBUG = False
    WTF_CSRF_ENABLED = True
    SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...'
    CELERY_BROKER_URL = 'sqla+mysql://...celery'
    CELERY_RESULT_BACKEND = "database"
    CELERY_RESULT_DBURI = 'mysql://.../celery'
    CELERY_TIMEZONE = 'Europe/Kiev'
    CELERY_ENABLE_UTC = False
    CELERYBEAT_SCHEDULE = {
        'send-email-every-morning': {
            'task': 'app.workers.tasks.send_email_task',
            'schedule': crontab(hour=6, minute=15),
        },
    }

config = {
    'development': DevelopmentConfig,
    'default': ProdConfig,
    'production': ProdConfig,
    'testing': TestConfig,
}


class AppConf:
    """
    Class to store current config even out of context
    """
    def __init__(self):
        self.app = None
        self.config = {}

    def init_app(self, app):
        if hasattr(app, 'config'):
            self.app = app
            self.config = app.config.copy()
        else:
            raise TypeError

init.py: import os

from flask import Flask
from celery import Celery
from config import config, AppConf

def create_app(config_name):
    app = Flask(__name__)
    app.config.from_object(config[config_name])
    config[config_name].init_app(app)
    app_conf.init_app(app)

    # Connect to Staging view
    from staging.views import staging as staging_blueprint
    app.register_blueprint(staging_blueprint)

    return app


def make_celery(app=None):
    app = app or create_app(os.getenv('FLASK_CONFIG') or 'default')
    celery = Celery(__name__, broker=app.config.CELERY_BROKER_URL)
    celery.conf.update(app.conf)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    return celery

tasks.py: from app import make_celery, app_conf

cel = make_celery(app_conf.app)

@cel.task
def send_realm_to_fabricdb(realm, form):
    some actions...

and here is the problem: The Blueprint "staging" uses task send_realm_to_fabricdb, so it makes: from tasks import send_realm_to_fabricdb than, when I just run application, everything goes ok BUT, when I'm trying to run celery: celery -A app.tasks worker -l info --beat, it goes to cel = make_celery(app_conf.app) in tasks.py, got app=None and trying to create application again: registering a blueprint... so I've got cycle import here. Could you tell me how to break this cycle? Thanks in advance.

Overword answered 16/4, 2015 at 9:43 Comment(1)
Howdie, I'm currently dealing with the same issue. Were you able to get this working?Frenulum
M
8

I don't have the code to try this out, but I think things would work better if you move the creation of the Celery instance out of tasks.py and into the create_app function, so that it happens at the same time the app instance is created.

The argument you give to the Celery worker in the -A option does not need to have the tasks, Celery just needs the celery object, so for example, you could create a separate starter script, say celery_worker.py that calls create_app to create app and cel and then give it to the worker as -A celery_worker.cel, without involving the blueprint at all.

Hope this helps.

Menispermaceous answered 16/4, 2015 at 14:27 Comment(2)
@Miguel Does this mean I am creating two instances of Flask, one in celery_worker.py and the other in maybe manage.py? I'm having the same issue.Peck
@ShulhiSapli Yes, these are two different processes, each has its own application instance. But both should be created in the same way, so they are effectively equivalent (for example, they have the same configuration). The only purpose of the app instance in the Celery worker is to provide a context for code that needs that. You cannot pass session, g, request variables from one process to the other in this way.Menispermaceous
D
2

What I do to solve this error is that I create two Flask instance which one is for Web app, and another is for initial Celery instance.

Like @Miguel said, I have

  • celery_app.py for celery instance
  • manager.py for Flask instance

And in these two files, each module has it's own Flask instance.

So I can use celery.task in Views. And I can start celery worker separately.

Doubtful answered 22/8, 2018 at 5:27 Comment(0)
D
0

Thanks Bob Jordan, you can find the answer from https://mcmap.net/q/656645/-python-flask-with-celery-out-of-application-context,

Key points:
1. make_celery do two things at the same time: create celery app and run celery with flask content, so you can create two functions to do make_celery job
2. celery app must init before blueprint register

Discriminative answered 23/10, 2019 at 2:52 Comment(0)
T
0

Having the same problem, I ended up solving it very easily using shared_task (docs), keeping a single app.py file and not having to instantiate the flask app multiple times.

The original situation that led to the circular import:

from src.app import celery  # src.app is ALSO importing the blueprints which are importing this file which causes the circular import.


@celery.task(bind=True)
def celery_test(self):
    sleep(5)
    logger.info("Task processed by Celery.")

The current code that works fine and avoids the circular import:

# from src.app import celery <- not needed anymore!


@shared_task(bind=True)
def celery_test(self):
    sleep(5)
    logger.info("Task processed by Celery.")

Please mind that I'm pretty new to Celery so I might be overseeing important stuff, it would be great if someone more experienced can give their opinion.

Truscott answered 21/1, 2022 at 12:20 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.