Flask,蓝图使用Celery任务并出现循环导入问题

12

我有一个使用Blueprints和Celery的应用程序,代码在这里:

config.py

import os
from celery.schedules import crontab
basedir = os.path.abspath(os.path.dirname(__file__))

class Config:
    SECRET_KEY = os.environ.get('SECRET_KEY') or ''
    SQLALCHEMY_COMMIT_ON_TEARDOWN = True
    RECORDS_PER_PAGE = 40
    SQLALCHEMY_DATABASE_URI = ''
    CELERY_BROKER_URL = ''
    CELERY_RESULT_BACKEND = ''
    CELERY_RESULT_DBURI = ''
    CELERY_TIMEZONE = 'Europe/Kiev'
    CELERY_ENABLE_UTC = False
    CELERYBEAT_SCHEDULE = {}

    @staticmethod
    def init_app(app):
        pass


class DevelopmentConfig(Config):
    DEBUG = True
    WTF_CSRF_ENABLED = True
    APP_HOME = ''
    SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...'
    CELERY_BROKER_URL = 'sqla+mysql://...'
    CELERY_RESULT_BACKEND = "database"
    CELERY_RESULT_DBURI = 'mysql://...'
    CELERY_TIMEZONE = 'Europe/Kiev'
    CELERY_ENABLE_UTC = False
    CELERYBEAT_SCHEDULE = {
        'send-email-every-morning': {
            'task': 'app.workers.tasks.send_email_task',
            'schedule': crontab(hour=6, minute=15),
        },
    }


class TestConfig(Config):
    DEBUG = True
    WTF_CSRF_ENABLED = False
    TESTING = True
    SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...'


class ProdConfig(Config):
    DEBUG = False
    WTF_CSRF_ENABLED = True
    SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...'
    CELERY_BROKER_URL = 'sqla+mysql://...celery'
    CELERY_RESULT_BACKEND = "database"
    CELERY_RESULT_DBURI = 'mysql://.../celery'
    CELERY_TIMEZONE = 'Europe/Kiev'
    CELERY_ENABLE_UTC = False
    CELERYBEAT_SCHEDULE = {
        'send-email-every-morning': {
            'task': 'app.workers.tasks.send_email_task',
            'schedule': crontab(hour=6, minute=15),
        },
    }

config = {
    'development': DevelopmentConfig,
    'default': ProdConfig,
    'production': ProdConfig,
    'testing': TestConfig,
}


class AppConf:
    """
    Class to store current config even out of context
    """
    def __init__(self):
        self.app = None
        self.config = {}

    def init_app(self, app):
        if hasattr(app, 'config'):
            self.app = app
            self.config = app.config.copy()
        else:
            raise TypeError

init.py: 导入 os 模块。

from flask import Flask
from celery import Celery
from config import config, AppConf

def create_app(config_name):
    app = Flask(__name__)
    app.config.from_object(config[config_name])
    config[config_name].init_app(app)
    app_conf.init_app(app)

    # Connect to Staging view
    from staging.views import staging as staging_blueprint
    app.register_blueprint(staging_blueprint)

    return app


def make_celery(app=None):
    app = app or create_app(os.getenv('FLASK_CONFIG') or 'default')
    celery = Celery(__name__, broker=app.config.CELERY_BROKER_URL)
    celery.conf.update(app.conf)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    return celery

tasks.py: 从app中导入make_celery和app_conf。

cel = make_celery(app_conf.app)

@cel.task
def send_realm_to_fabricdb(realm, form):
    some actions...

这里的问题是: 蓝图“staging”使用了任务send_realm_to_fabricdb,因此需要添加:from tasks import send_realm_to_fabricdb 当我仅运行应用程序时,一切正常。 但是,当我尝试运行celery:celery -A app.tasks worker -l info --beat时,它会进入tasks.py中的cel = make_celery(app_conf.app),得到app=None,并尝试再次创建应用程序:注册蓝图...因此出现了循环导入。 请问如何打破此循环? 提前致谢。

您好,根据你描述的问题,可能是由于循环引用导致的。建议您将任务作为参数传递给"make_celery"函数而不是在其中导入,如果您需要导入任务,则可以将其移至另一个模块中并从那里导入。这应该可以解决您遇到的循环导入问题。祝您好运!


1
你好,我目前也在处理同样的问题。你能解决这个问题吗? - Jimmy
4个回答

8
我没有代码来尝试,但我认为如果您将Celery实例的创建从tasks.py中移出并放入create_app函数中,则事情会更顺利,因为它将在创建app实例时同时发生。
-A选项中给Celery worker的参数不需要包含任务,Celery只需要celery对象,例如,您可以创建一个单独的启动脚本,比如celery_worker.py,调用create_app来创建appcel,然后将其作为-A celery_worker.cel提供给worker,完全不涉及蓝图。
希望这可以帮助您。

@Miguel 这是不是意味着我要在celery_worker.py和也许在manage.py创建两个Flask实例?我遇到了同样的问题。 - Shulhi Sapli
3
@ShulhiSapli 是的,这是两个不同的进程,每个进程都有自己的应用实例。但是两者都应该以相同的方式创建,因此它们在本质上是相等的(例如,它们具有相同的配置)。Celery 工作进程中应用程序实例的唯一目的是为需要其上下文的代码提供一个上下文。您不能以这种方式从一个进程传递 sessiongrequest 变量到另一个进程。 - Miguel Grinberg

2
我解决这个错误的方法是创建两个Flask实例,一个用于Web应用程序,另一个用于初始Celery实例。就像@Miguel所说,我有:
- celery_app.py 用于Celery实例 - manager.py 用于Flask实例
在这两个文件中,每个模块都有自己的Flask实例。因此,我可以在视图中使用celery.task,并可以单独启动celery worker。

0

感谢 Bob Jordan,你可以在 https://dev59.com/tmQo5IYBdhLWcg3wOtMC#50665633 找到答案。

关键点:
1. make_celery 同时完成两个任务:创建 celery 应用程序并使用 flask 内容运行 celery,因此您可以创建两个函数来完成 make_celery 的工作。
2. 必须在蓝图注册之前初始化 celery 应用程序。


0

我也遇到了同样的问题,最终使用shared_taskdocs)轻松解决了它,只需保留一个app.py文件,而不必多次实例化flask应用程序。

导致循环导入的原始情况:

from src.app import celery  # src.app is ALSO importing the blueprints which are importing this file which causes the circular import.


@celery.task(bind=True)
def celery_test(self):
    sleep(5)
    logger.info("Task processed by Celery.")

目前的代码运行良好,避免了循环导入:

# from src.app import celery <- not needed anymore!


@shared_task(bind=True)
def celery_test(self):
    sleep(5)
    logger.info("Task processed by Celery.")

请注意,我对Celery还比较新,可能会忽略重要的事情,如果有更有经验的人能给出意见,那就太好了。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接